odoo18/addons_extensions/helpdesk/models/helpdesk_team.py

1017 lines
52 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
import ast
import datetime
from dateutil import relativedelta
from collections import defaultdict
from pytz import timezone
from odoo import api, Command, fields, models, _
from odoo.exceptions import ValidationError
from odoo.osv import expression
from odoo.tools import float_round
from odoo.addons.rating.models.rating_data import RATING_LIMIT_MIN
from odoo.addons.web.controllers.utils import clean_action
class HelpdeskTeam(models.Model):
_name = "helpdesk.team"
_inherit = ['mail.alias.mixin', 'mail.thread', 'rating.parent.mixin']
_description = "Helpdesk Team"
_order = 'sequence,name'
_rating_satisfaction_days = 7 # include only last 7 days to compute satisfaction and average
def _default_stage_ids(self):
default_stages = self.env['helpdesk.stage']
for xml_id in ['stage_new', 'stage_in_progress', 'stage_solved', 'stage_cancelled']:
stage = self.env.ref('helpdesk.%s' % xml_id, raise_if_not_found=False)
if stage:
default_stages += stage
if not default_stages:
default_stages = self.env['helpdesk.stage'].create({
'name': _("New"),
'sequence': 0,
'template_id': self.env.ref('helpdesk.new_ticket_request_email_template', raise_if_not_found=False).id or None
})
return [Command.set(default_stages.ids)]
name = fields.Char('Helpdesk Team', required=True, translate=True)
description = fields.Html('About Team', translate=True)
active = fields.Boolean(default=True)
company_id = fields.Many2one('res.company', string='Company', required=True, default=lambda self: self.env.company)
sequence = fields.Integer(export_string_translation=False, default=10)
color = fields.Integer('Color Index', default=0)
ticket_properties = fields.PropertiesDefinition('Ticket Properties')
stage_ids = fields.Many2many(
'helpdesk.stage', relation='team_stage_rel', string='Stages',
default=_default_stage_ids,
help="Stages the team will use. This team's tickets will only be able to be in these stages.")
auto_assignment = fields.Boolean("Automatic Assignment")
assign_method = fields.Selection([
('randomly', 'Each user is assigned an equal number of tickets'),
('balanced', 'Each user has an equal number of open tickets')],
string='Assignment Method', default='randomly', required=True,
help="New tickets will automatically be assigned to the team members that are available, according to their working hours and their time off.")
member_ids = fields.Many2many('res.users', string='Team Members', domain=lambda self: [('groups_id', 'in', self.env.ref('helpdesk.group_helpdesk_user').id)],
default=lambda self: self.env.user, required=True)
privacy_visibility = fields.Selection([
('invited_internal', 'Invited internal users (private)'),
('internal', 'All internal users (company)'),
('portal', 'Invited portal users and all internal users (public)')],
string='Visibility', required=True,
default='portal',
help="People to whom this helpdesk team and its tickets will be visible.\n\n"
"- Invited internal users: internal users can access the team and the tickets they are following. "
"This access can be modified on each ticket individually by adding or removing the user as follower.\n"
"A user with the helpdesk > administrator access right level can still access this team and its tickets, even if they are not explicitely part of the followers.\n\n"
"- All internal users: all internal users can access the team and all of its tickets without distinction.\n\n"
"- Invited portal users and all internal users: all internal users can access the team and all of its tickets without distinction.\n"
"Portal users can only access the tickets they are following. "
"This access can be modified on each ticket individually by adding or removing the portal user as follower.")
privacy_visibility_warning = fields.Char(compute='_compute_privacy_visibility_warning', export_string_translation=False)
access_instruction_message = fields.Char(compute='_compute_access_instruction_message', export_string_translation=False)
ticket_ids = fields.One2many('helpdesk.ticket', 'team_id', string='Tickets')
use_alias = fields.Boolean('Use Alias', default=True)
has_external_mail_server = fields.Boolean(compute='_compute_has_external_mail_server', export_string_translation=False)
allow_portal_ticket_closing = fields.Boolean('Closure by Customers')
use_website_helpdesk_form = fields.Boolean('Website Form', compute='_compute_use_website_helpdesk_form', readonly=False, store=True)
use_website_helpdesk_livechat = fields.Boolean('Live Chat')
use_website_helpdesk_forum = fields.Boolean('Community Forum', compute='_compute_use_website_helpdesk_forum', readonly=False, store=True)
use_website_helpdesk_slides = fields.Boolean('eLearning', compute='_compute_use_website_helpdesk_slides', readonly=False, store=True)
use_website_helpdesk_knowledge = fields.Boolean('Knowledge', compute='_compute_use_website_helpdesk_knowledge', readonly=False, store=True)
use_helpdesk_timesheet = fields.Boolean(
'Timesheets', compute='_compute_use_helpdesk_timesheet',
store=True, readonly=False)
show_knowledge_base = fields.Boolean(compute='_compute_show_knowledge_base', export_string_translation=False)
use_helpdesk_sale_timesheet = fields.Boolean(
'Time Billing', compute='_compute_use_helpdesk_sale_timesheet', store=True,
readonly=False)
use_credit_notes = fields.Boolean('Refunds')
use_coupons = fields.Boolean('Coupons')
use_fsm = fields.Boolean('Field Service')
use_product_returns = fields.Boolean('Returns')
use_product_repairs = fields.Boolean('Repairs')
use_twitter = fields.Boolean('X')
use_rating = fields.Boolean('Customer Ratings')
use_sla = fields.Boolean('SLA Policies', default=True)
unassigned_tickets = fields.Integer(string='Unassigned Tickets', compute='_compute_unassigned_tickets')
resource_calendar_id = fields.Many2one('resource.calendar', 'Working Hours',
default=lambda self: self.env.company.resource_calendar_id, domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]",
help="Working hours used to determine the deadline of SLA Policies.")
open_ticket_count = fields.Integer("# Open Tickets", compute='_compute_open_ticket_count')
sla_policy_count = fields.Integer("# SLA Policy", compute='_compute_sla_policy_count')
ticket_closed = fields.Integer(string='Ticket Closed', compute='_compute_ticket_closed')
success_rate = fields.Float(string='Success Rate', compute='_compute_success_rate', groups="helpdesk.group_use_sla")
urgent_ticket = fields.Integer(string='# Urgent Ticket', compute='_compute_urgent_ticket')
sla_failed = fields.Integer(string='Failed SLA Ticket', compute='_compute_sla_failed')
# auto close ticket
auto_close_ticket = fields.Boolean('Automatic Closing')
auto_close_day = fields.Integer('Inactive Period(days)',
default=7,
help="Period of inactivity after which tickets will be automatically closed.")
from_stage_ids = fields.Many2many('helpdesk.stage', relation='team_stage_auto_close_from_rel',
string='In Stages',
domain="[('id', 'in', stage_ids)]")
to_stage_id = fields.Many2one('helpdesk.stage',
string='Move to Stage',
compute="_compute_assign_stage_id", readonly=False, store=True,
domain="[('id', 'in', stage_ids)]")
alias_email_from = fields.Char(compute='_compute_alias_email_from', export_string_translation=False)
@api.constrains('use_website_helpdesk_form', 'privacy_visibility')
def _check_website_privacy(self):
if any(t.use_website_helpdesk_form and t.privacy_visibility != 'portal' for t in self):
raise ValidationError(_('The visibility of the team needs to be set as "Invited portal users and all internal users" in order to use the website form.'))
@api.depends('auto_close_ticket', 'stage_ids')
def _compute_assign_stage_id(self):
stages_dict = {stage['id']: 1 if stage['fold'] else 2 for stage in self.env['helpdesk.stage'].search_read([('id', 'in', self.stage_ids.ids), ('fold', '=', True)], ['id', 'fold'])}
for team in self:
if not team.stage_ids:
team.to_stage_id = False
continue
stage_ids = sorted([
(val, stage_id) for stage_id, val in stages_dict.items() if stage_id in team.stage_ids.ids
])
team.to_stage_id = stage_ids[0][1] if stage_ids else team.stage_ids and team.stage_ids.ids[-1]
def _compute_alias_email_from(self):
res = self._notify_get_reply_to()
for team in self:
team.alias_email_from = res.get(team.id, False)
def _compute_has_external_mail_server(self):
self.has_external_mail_server = self.env['ir.config_parameter'].sudo().get_param('base_setup.default_external_email_server')
def _compute_unassigned_tickets(self):
ticket_data = self.env['helpdesk.ticket']._read_group([
('user_id', '=', False),
('team_id', 'in', self.ids),
('stage_id.fold', '=', False),
], ['team_id'], ['__count'])
mapped_data = {team.id: count for team, count in ticket_data}
for team in self:
team.unassigned_tickets = mapped_data.get(team.id, 0)
def _compute_ticket_closed(self):
dt = datetime.datetime.combine(datetime.date.today() - relativedelta.relativedelta(days=6), datetime.time.min)
ticket_data = self.env['helpdesk.ticket']._read_group([
('team_id', 'in', self.ids),
('stage_id.fold', '=', True),
('close_date', '>=', dt)],
['team_id'], ['__count'])
mapped_data = {team.id: count for team, count in ticket_data}
for team in self:
team.ticket_closed = mapped_data.get(team.id, 0)
def _compute_success_rate(self):
dt = datetime.datetime.combine(datetime.date.today() - relativedelta.relativedelta(days=6), datetime.time.min)
sla_teams = self.filtered('use_sla')
domain = [
('team_id', 'in', sla_teams.ids),
'&', ('stage_id.fold', '=', True), ('close_date', '>=', dt)
]
sla_tickets_and_failed_tickets_per_team = defaultdict(lambda: [0, 0])
today = fields.Datetime.now()
tickets_sla_count = self.env['helpdesk.ticket']._read_group(domain + [
'|', ('sla_reached', '=', True), ('sla_reached_late', '=', True)],
['team_id'], ['__count']
)
tickets_success_count = self.env['helpdesk.ticket']._read_group(domain + [
'|', ('sla_deadline', '<', today), ('sla_reached_late', '=', True)],
['team_id'], ['__count']
)
for team, team_count in tickets_sla_count:
sla_tickets_and_failed_tickets_per_team[team.id][0] = team_count
for team, team_count in tickets_success_count:
sla_tickets_and_failed_tickets_per_team[team.id][1] = team_count
for team in sla_teams:
if not sla_tickets_and_failed_tickets_per_team.get(team.id):
team.success_rate = -1
continue
total_count = sla_tickets_and_failed_tickets_per_team[team.id][0]
success_count = total_count - sla_tickets_and_failed_tickets_per_team[team.id][1]
team.success_rate = float_round(success_count * 100 / total_count, 2) if total_count else 0.0
(self - sla_teams).success_rate = -1
def _compute_urgent_ticket(self):
ticket_data = self.env['helpdesk.ticket']._read_group([
('team_id', 'in', self.ids),
('stage_id.fold', "=", False),
('priority', '=', 3)],
['team_id'], ['__count'])
mapped_data = {team.id: count for team, count in ticket_data}
for team in self:
team.urgent_ticket = mapped_data.get(team.id, 0)
def _compute_sla_failed(self):
ticket_data = self.env['helpdesk.ticket']._read_group([
('team_id', 'in', self.ids),
('stage_id.fold', '=', False),
('sla_fail', '=', True)],
['team_id'], ['__count'])
mapped_data = {team.id: count for team, count in ticket_data}
for team in self:
team.sla_failed = mapped_data.get(team.id, 0)
def _compute_open_ticket_count(self):
ticket_data = self.env['helpdesk.ticket']._read_group([
('team_id', 'in', self.ids), ('stage_id.fold', '=', False)
], ['team_id'], ['__count'])
mapped_data = {team.id: count for team, count in ticket_data}
for team in self:
team.open_ticket_count = mapped_data.get(team.id, 0)
def _compute_sla_policy_count(self):
sla_data = self.env['helpdesk.sla']._read_group([('team_id', 'in', self.ids)], ['team_id'], ['__count'])
mapped_data = {team.id: count for team, count in sla_data}
for team in self:
team.sla_policy_count = mapped_data.get(team.id, 0)
@api.onchange('use_alias', 'name')
def _onchange_use_alias(self):
if not self.use_alias:
self.alias_name = False
if self._origin.id and self.use_alias and not self.alias_name and self.name:
self.alias_name = self._alias_get_creation_values()['alias_name'].lower()
@api.depends('use_website_helpdesk_knowledge', 'use_website_helpdesk_slides', 'use_website_helpdesk_forum')
def _compute_use_website_helpdesk_form(self):
teams = self.filtered(lambda team: not team.use_website_helpdesk_form and (team.use_website_helpdesk_knowledge or team.use_website_helpdesk_slides or team.use_website_helpdesk_forum))
teams.use_website_helpdesk_form = True
@api.depends('use_website_helpdesk_form')
def _compute_use_website_helpdesk_forum(self):
teams = self.filtered(lambda team: not team.use_website_helpdesk_form and team.use_website_helpdesk_forum)
teams.use_website_helpdesk_forum = False
@api.depends('use_website_helpdesk_form')
def _compute_use_website_helpdesk_slides(self):
teams = self.filtered(lambda team: not team.use_website_helpdesk_form and team.use_website_helpdesk_slides)
teams.use_website_helpdesk_slides = False
@api.depends('use_website_helpdesk_form')
def _compute_use_website_helpdesk_knowledge(self):
teams = self.filtered(lambda team: not team.use_website_helpdesk_form and team.use_website_helpdesk_knowledge)
teams.use_website_helpdesk_knowledge = False
@api.depends('use_helpdesk_sale_timesheet')
def _compute_use_helpdesk_timesheet(self):
sale_timesheet = self.filtered('use_helpdesk_sale_timesheet')
sale_timesheet.update({'use_helpdesk_timesheet': True})
@api.depends('use_helpdesk_timesheet')
def _compute_use_helpdesk_sale_timesheet(self):
without_timesheet = self.filtered(lambda t: not t.use_helpdesk_timesheet)
without_timesheet.update({'use_helpdesk_sale_timesheet': False})
def _compute_show_knowledge_base(self):
self.show_knowledge_base = False
@api.depends('privacy_visibility')
def _compute_privacy_visibility_warning(self):
for team in self:
if not team.ids:
team.privacy_visibility_warning = ''
elif team.privacy_visibility == 'portal' and team._origin.privacy_visibility != 'portal':
team.privacy_visibility_warning = _('Customers will be added to the followers of their tickets.')
elif team.privacy_visibility != 'portal' and team._origin.privacy_visibility == 'portal':
team.privacy_visibility_warning = _('Portal users will be removed from the followers of the team and its tickets.')
else:
team.privacy_visibility_warning = ''
@api.depends('privacy_visibility')
def _compute_access_instruction_message(self):
for team in self:
if team.privacy_visibility == 'portal':
team.access_instruction_message = _('Grant portal users access to your helpdesk team or tickets by adding them as followers. Customers automatically get access to their tickets in their portal.')
elif team.privacy_visibility == 'invited_internal':
team.access_instruction_message = _('Grant employees access to your helpdesk team or tickets by adding them as followers. Employees automatically get access to the tickets they are assigned to.')
else:
team.access_instruction_message = ''
def get_knowledge_base_url(self):
self.ensure_one()
return self.get_portal_url()
@api.onchange('auto_assignment')
def _onchange_assign_method(self):
if not self.member_ids:
self.member_ids = [Command.set(self.env.user.ids)]
# ------------------------------------------------------------
# ORM overrides
# ------------------------------------------------------------
@api.depends('company_id')
@api.depends_context('allowed_company_ids')
def _compute_display_name(self):
super()._compute_display_name()
if len(self.env.context.get('allowed_company_ids', [])) <= 1:
return
team_default_name = _('Customer Care')
for team in self:
if team.name == team_default_name:
team.display_name = f'{team.display_name} - {team.company_id.name}'
@api.model_create_multi
def create(self, vals_list):
teams = super(HelpdeskTeam, self.with_context(mail_create_nosubscribe=True)).create(vals_list)
teams.sudo()._check_sla_group()
teams.sudo()._check_rating_group()
teams.sudo()._check_auto_assignment_group()
teams.sudo()._check_modules_to_install()
if teams.filtered(lambda x: x.auto_close_ticket):
teams._update_cron()
# If you plan to add something after this, use a new environment. The one above is no longer valid after the modules install.
return teams
def write(self, vals):
if vals.get('privacy_visibility'):
self._change_privacy_visibility(vals['privacy_visibility'])
if 'alias_name' in vals and not vals['alias_name'] and (vals['use_alias'] if 'use_alias' in vals else self.use_alias):
default_alias = self.name.replace(' ', '-') if self.name else ''
vals['alias_name'] = self.alias_name or default_alias
result = super(HelpdeskTeam, self).write(vals)
if 'active' in vals:
self.with_context(active_test=False).mapped('ticket_ids').write({'active': vals['active']})
if 'use_sla' in vals:
self.sudo()._check_sla_group()
if 'use_rating' in vals:
self.sudo()._check_rating_group()
if 'auto_assignment' in vals:
self.sudo()._check_auto_assignment_group()
self.sudo()._check_modules_to_install()
if 'auto_close_ticket' in vals:
self._update_cron()
# If you plan to add something after this, use a new environment. The one above is no longer valid after the modules install.
return result
def unlink(self):
stages = self.mapped('stage_ids').filtered(lambda stage: stage.team_ids <= self) # remove stages that only belong to team in self
stages.unlink()
return super(HelpdeskTeam, self).unlink()
def copy_data(self, default=None):
vals_list = super().copy_data(default=default)
return [dict(vals, name=self.env._("%s (copy)", team.name)) for team, vals in zip(self, vals_list)]
def _change_privacy_visibility(self, new_visibility):
"""
Unsubscribe non-internal users from the team and tickets if the team privacy visibility
goes from 'portal' to a different value.
If the privacy visibility is set to 'portal', subscribe back tickets partners.
"""
for team in self:
if team.privacy_visibility == new_visibility:
continue
if new_visibility == 'portal':
for ticket in team.mapped('ticket_ids').filtered('partner_id'):
ticket.message_subscribe(partner_ids=ticket.partner_id.ids)
elif team.privacy_visibility == 'portal':
portal_users = team.message_partner_ids.user_ids.filtered('share')
team.message_unsubscribe(partner_ids=portal_users.partner_id.ids)
team.mapped('ticket_ids')._unsubscribe_portal_users()
@api.model
def _update_cron(self):
cron = self.env.ref('helpdesk.ir_cron_auto_close_ticket', raise_if_not_found=False)
cron and cron.toggle(model=self._name, domain=[
('auto_close_ticket', '=', True),
('auto_close_day', '>', 0),
])
def _get_helpdesk_user_group(self):
return self.env.ref('helpdesk.group_helpdesk_user')
def _get_helpdesk_use_sla_group(self):
return self.env.ref('helpdesk.group_use_sla')
def _get_helpdesk_use_rating_group(self):
return self.env.ref('helpdesk.group_use_rating')
def _check_sla_feature_enabled(self, check_user_has_group=False):
""" Check if the SLA feature is enabled
Check if the user can see at least one helpdesk team with `use_sla=True`
and if the user has the `group_use_sla` group (only done if the `check_user_has_group` parameter is True)
:param check_user_has_group: If True, then check if the user has the `group_use_sla`
:return True if the feature is enabled otherwise False.
"""
user_has_group = self.env.user.has_group('helpdesk.group_use_sla') if check_user_has_group else True
return user_has_group and self.env['helpdesk.team'].search([('use_sla', '=', True)], limit=1)
def _check_rating_feature_enabled(self, check_user_has_group=False):
""" Check if the Customer Rating feature is enabled
Check if the user can see at least one helpdesk team with `use_rating=True`
and if the user has the `group_use_rating` group (only done if the `check_user_has_group` parameter is True)
:param check_user_has_group: If True, then check if the user has the `group_use_rating`
:return True if the feature is enabled otherwise False.
"""
user_has_group = self.env.user.has_group('helpdesk.group_use_rating') if check_user_has_group else True
return user_has_group and self.env['helpdesk.team'].search([('use_rating', '=', True)], limit=1)
def _check_sla_group(self):
sla_teams = self.filtered('use_sla')
non_sla_teams = self - sla_teams
use_sla_group = helpdesk_user_group = None
user_has_use_sla_group = self.env.user.has_group('helpdesk.group_use_sla')
if sla_teams:
if not user_has_use_sla_group:
use_sla_group = self._get_helpdesk_use_sla_group()
helpdesk_user_group = self._get_helpdesk_user_group()
helpdesk_user_group.write({'implied_ids': [Command.link(use_sla_group.id)]})
self.env['helpdesk.sla'].with_context(active_test=False).search([
('team_id', 'in', sla_teams.ids), ('active', '=', False),
]).write({'active': True})
if non_sla_teams:
self.env['helpdesk.sla'].search([('team_id', 'in', non_sla_teams.ids)]).write({'active': False})
if user_has_use_sla_group and not self._check_sla_feature_enabled():
use_sla_group = use_sla_group or self._get_helpdesk_use_sla_group()
helpdesk_user_group = helpdesk_user_group or self._get_helpdesk_user_group()
helpdesk_user_group.write({'implied_ids': [Command.unlink(use_sla_group.id)]})
use_sla_group.write({'users': [Command.clear()]})
def _check_rating_group(self):
rating_teams = self.filtered('use_rating')
user_has_use_rating_group = self.env.user.has_group('helpdesk.group_use_rating')
rating_helpdesk_email_template = self.env.ref('helpdesk.rating_ticket_request_email_template')
if rating_teams and not user_has_use_rating_group:
self._get_helpdesk_user_group()\
.write({'implied_ids': [Command.link(self._get_helpdesk_use_rating_group().id)]})
if not rating_helpdesk_email_template.active:
rating_helpdesk_email_template.active = True
elif self - rating_teams and user_has_use_rating_group and not self._check_rating_feature_enabled():
use_rating_group = self._get_helpdesk_use_rating_group()
self._get_helpdesk_user_group()\
.write({'implied_ids': [Command.unlink(use_rating_group.id)]})
use_rating_group.write({'users': [Command.clear()]})
if rating_helpdesk_email_template.active:
rating_helpdesk_email_template.active = False
self.env['helpdesk.stage'].search([('template_id', '=', self.env.ref('helpdesk.rating_ticket_request_email_template').id)]).template_id = False
def _check_auto_assignment_group(self):
has_auto_assignment_group = self.env.user.has_group('helpdesk.group_auto_assignment')
has_auto_assignment = self.env['helpdesk.team'].search_count([('auto_assignment', '=', True)], limit=1)
group_auto_assignment = self.env.ref('helpdesk.group_auto_assignment')
if has_auto_assignment and not has_auto_assignment_group:
self._get_helpdesk_user_group().write({'implied_ids': [Command.link(group_auto_assignment.id)]})
elif not has_auto_assignment and has_auto_assignment_group:
self._get_helpdesk_user_group().write({'implied_ids': [Command.unlink(group_auto_assignment.id)]})
group_auto_assignment.write({'users': [Command.clear()]})
def _get_field_check_method(self):
# mapping of field names to the function that checks if their feature is enabled
return {
'use_sla': self._check_sla_feature_enabled,
'use_rating': self._check_rating_feature_enabled,
}
@api.model
def _get_field_modules(self):
# mapping of field names to module names
return {
'use_website_helpdesk_form': 'website_helpdesk',
'use_website_helpdesk_livechat': 'website_helpdesk_livechat',
'use_website_helpdesk_forum': 'website_helpdesk_forum',
'use_website_helpdesk_slides': 'website_helpdesk_slides',
'use_website_helpdesk_knowledge': 'website_helpdesk_knowledge',
'use_helpdesk_timesheet': 'helpdesk_timesheet',
'use_helpdesk_sale_timesheet': 'helpdesk_sale_timesheet',
'use_credit_notes': 'helpdesk_account',
'use_product_returns': 'helpdesk_stock',
'use_product_repairs': 'helpdesk_repair',
'use_coupons': 'helpdesk_sale_loyalty',
'use_fsm': 'helpdesk_fsm',
}
@api.model
def check_features_enabled(self, updated_features=None):
if not self.env.user.has_group("helpdesk.group_helpdesk_user"):
return {}
if updated_features is None:
return {key: bool(check_method()) for key, check_method in self._get_field_check_method().items()}
return {key: bool(check_method()) for key, check_method in self._get_field_check_method().items() if updated_features and key in updated_features}
@api.model
def check_modules_to_install(self, enabled_features):
""" check if a module has to be installed according to the fields given in parameter.
:param list enabled_features: list of features enabled in the frontend by the user
to check if a module will be installed when the helpdesk
team is saved.
:return: boolean value, True if at least a module will be installed after saving
the changes in helpdesk team, otherwise False.
"""
if not enabled_features:
return False
module_names = [
module_name
for fname, module_name in self._get_field_modules().items()
if fname in enabled_features
]
if module_names:
return bool(
self.env['ir.module.module'].search_count([
('name', 'in', module_names),
('state', 'not in', ('installed', 'to install', 'to upgrade')),
], limit=1)
)
return False
def _check_modules_to_install(self):
# determine the modules to be installed
expected = [
mname
for fname, mname in self._get_field_modules().items()
if any(team[fname] for team in self)
]
modules = self.env['ir.module.module']
if expected:
STATES = ('installed', 'to install', 'to upgrade')
modules = modules.search([('name', 'in', expected)])
modules = modules.filtered(lambda module: module.state not in STATES)
if modules:
modules.button_immediate_install()
# just in case we want to do something if we install a module. (like a refresh ...)
return bool(modules)
# ------------------------------------------------------------
# Mail Alias Mixin
# ------------------------------------------------------------
def _alias_get_creation_values(self):
values = super(HelpdeskTeam, self)._alias_get_creation_values()
values['alias_model_id'] = self.env['ir.model']._get('helpdesk.ticket').id
if self._origin.id:
values['alias_defaults'] = defaults = ast.literal_eval(self.alias_defaults or "{}")
defaults['team_id'] = self.id
if not self.alias_name:
base_email_alias = self.name.lower().replace(' ', '-')
values['alias_name'] = self._ensure_unique_email_alias(base_email_alias)
return values
def _ensure_unique_email_alias(self, email_alias):
existing_aliases = self._get_existing_email_aliases(email_alias)
modified_email_alias = email_alias
counter = 2
while modified_email_alias in existing_aliases:
modified_email_alias = f"{email_alias}-{counter}"
counter += 1
return self.env['mail.alias']._sanitize_alias_name(modified_email_alias)
def _get_existing_email_aliases(self, email_alias):
existing_aliases = self.env['mail.alias'].search([('alias_name', 'ilike', email_alias)])
return {alias.alias_name for alias in existing_aliases}
# ------------------------------------------------------------
# Business Methods
# ------------------------------------------------------------
@api.model
def retrieve_dashboard(self):
user_uses_sla = self._check_sla_feature_enabled(check_user_has_group=True)
HelpdeskTicket = self.env['helpdesk.ticket']
show_demo = not bool(HelpdeskTicket.search([], limit=1))
result = {
'helpdesk_target_closed': self.env.user.helpdesk_target_closed,
'helpdesk_target_rating': self.env.user.helpdesk_target_rating,
'helpdesk_target_success': self.env.user.helpdesk_target_success,
'today': {'sla_ticket_count': 0, 'count': 0, 'rating': 0, 'success': 0},
'7days': {'sla_ticket_count': 0, 'count': 0, 'rating': 0, 'success': 0},
'my_all': {'count': 0, 'hours': 0, 'failed': 0},
'my_high': {'count': 0, 'hours': 0, 'failed': 0},
'my_urgent': {'count': 0, 'hours': 0, 'failed': 0},
'show_demo': show_demo,
'rating_enable': False,
'success_rate_enable': user_uses_sla
}
if show_demo:
result.update({
'my_all': {'count': 10, 'hours': 30, 'failed': 4},
'my_high': {'count': 3, 'hours': 10, 'failed': 2},
'my_urgent': {'count': 2, 'hours': 15, 'failed': 1},
'today': {'sla_ticket_count': 1, 'count': 1, 'rating': 2.5, 'success': 50},
'7days': {'sla_ticket_count': 1, 'count': 15, 'rating': 3.5, 'success': 80},
'helpdesk_target_rating': 3.5,
'helpdesk_target_success': 85,
'helpdesk_target_closed': 12,
})
return result
def _is_sla_failed(data):
deadline = data.get('sla_deadline')
sla_deadline = fields.Datetime.now() > deadline if deadline else False
return sla_deadline or data.get('sla_reached_late')
def add_to(ticket, key="my_all"):
result[key]['count'] += 1
result[key]['hours'] += ticket['open_hours']
if _is_sla_failed(ticket):
result[key]['failed'] += 1
domain = [('user_id', '=', self.env.uid)]
tickets = HelpdeskTicket.search_read(
expression.AND([
domain,
[('stage_id.fold', '=', False)]
]),
['sla_deadline', 'open_hours', 'sla_reached_late', 'priority']
)
for ticket in tickets:
add_to(ticket, 'my_all')
if ticket['priority'] == '2':
add_to(ticket, 'my_high')
if ticket['priority'] == '3':
add_to(ticket, 'my_urgent')
group_fields = []
if user_uses_sla:
group_fields = ['sla_reached_late', 'sla_reached']
dt = fields.Date.context_today(self)
tickets = HelpdeskTicket._read_group(domain + [('stage_id.fold', '=', True), ('close_date', '>=', dt)], group_fields, ['__count'])
for row in tickets:
if not user_uses_sla:
[count] = row
else:
sla_reached_late, sla_reached, count = row
if sla_reached or sla_reached_late:
result['today']['sla_ticket_count'] += count
if not sla_reached_late:
result['today']['success'] += count
result['today']['count'] += count
dt = fields.Datetime.to_string((datetime.date.today() - relativedelta.relativedelta(days=6)))
tickets = HelpdeskTicket._read_group(domain + [('stage_id.fold', '=', True), ('close_date', '>=', dt)], group_fields, ['__count'])
for row in tickets:
if not user_uses_sla:
[count] = row
else:
sla_reached_late, sla_reached, count = row
if sla_reached or sla_reached_late:
result['7days']['sla_ticket_count'] += count
if not sla_reached_late:
result['7days']['success'] += count
result['7days']['count'] += count
result['today']['success'] = fields.Float.round(result['today']['success'] * 100 / (result['today']['sla_ticket_count'] or 1), 2)
result['7days']['success'] = fields.Float.round(result['7days']['success'] * 100 / (result['7days']['sla_ticket_count'] or 1), 2)
result['my_all']['hours'] = fields.Float.round(result['my_all']['hours'] / (result['my_all']['count'] or 1), 2)
result['my_high']['hours'] = fields.Float.round(result['my_high']['hours'] / (result['my_high']['count'] or 1), 2)
result['my_urgent']['hours'] = fields.Float.round(result['my_urgent']['hours'] / (result['my_urgent']['count'] or 1), 2)
if self._check_rating_feature_enabled(check_user_has_group=True):
result['rating_enable'] = True
# rating of today
domain = [('user_id', '=', self.env.uid)]
today = fields.Date.today()
one_week_before = today - relativedelta.relativedelta(weeks=1)
helpdesk_ratings = self.env['rating.rating'].search([
('res_model', '=', 'helpdesk.ticket'),
('res_id', '!=', False),
('write_date', '>', fields.Datetime.to_string(one_week_before)),
('write_date', '<=', today),
('rating', '>=', RATING_LIMIT_MIN),
('consumed', '=', True),
])
tickets = HelpdeskTicket.search([('id', 'in', helpdesk_ratings.mapped('res_id')), ('user_id', '=', self._uid)])
today_rating_stat = {'count': 0.0, 'score': 0.0}
rating_stat = {**today_rating_stat}
for rating in helpdesk_ratings:
if rating.res_id not in tickets.ids:
continue
if rating.write_date.date() == today:
today_rating_stat['count'] += 1
today_rating_stat['score'] += rating.rating
rating_stat['score'] += rating.rating
rating_stat['count'] += 1
def average_score(d):
return fields.Float.round(d['score'] / d['count'] if d['count'] > 0 else 0.0, 2)
result['today']['rating'] = average_score(today_rating_stat)
result['7days']['rating'] = average_score(rating_stat)
return result
def _action_view_rating(self, period=False, only_closed_tickets=False, user_id=None):
""" return the action to see the rating about the tickets of the Team on the period wished.
:param period: either 'seven_days' or 'today' is defined to add a default filter for the ratings.
:param only_my_closed: True will include only the tickets in a closed stage.
:param user_id: id of the user to get the ratings only in the tickets belongs to the user.
"""
action = self.env["ir.actions.actions"]._for_xml_id("helpdesk.rating_rating_action_helpdesk")
action = clean_action(action, self.env)
domain = [('team_id', 'in', self.ids)]
context = dict(ast.literal_eval(action.get('context', {})), search_default_my_ratings=True)
update_views = {}
if period == 'seven_days':
domain += [('close_date', '>=', fields.Datetime.to_string((datetime.date.today() - relativedelta.relativedelta(days=6))))]
update_views[self.env.ref("helpdesk.rating_rating_view_seven_days_pivot_inherit_helpdesk").id] = 'pivot'
update_views[self.env.ref('helpdesk.rating_rating_view_seven_days_graph_inherit_helpdesk').id] = 'graph'
context['search_default_filter_create_date'] = 'custom_create_date_last_7_days'
elif period == 'today':
context['search_default_filter_create_date'] = 'custom_create_date_today'
if '__count__' in context.get('pivot_measures', {}):
context.get('pivot_measures').remove('__count__')
domain += [('close_date', '>=', fields.Datetime.to_string(datetime.date.today()))]
update_views[self.env.ref("helpdesk.rating_rating_view_today_pivot_inherit_helpdesk").id] = 'pivot'
update_views[self.env.ref('helpdesk.rating_rating_view_today_graph_inherit_helpdesk').id] = 'graph'
action['views'] = [(state, view) for state, view in action['views'] if view not in update_views.values()] + list(update_views.items())
if only_closed_tickets:
domain += [('stage_id.fold', '=', True)]
if user_id:
domain += [('user_id', '=', user_id)]
ticket_ids = self.env['helpdesk.ticket'].search(domain).ids
action.update({
'context': context,
'domain': [('res_id', 'in', ticket_ids), ('rating', '>=', RATING_LIMIT_MIN), ('res_model', '=', 'helpdesk.ticket'), ('consumed', '=', True)],
})
return action
def action_view_ticket(self):
action = self.env["ir.actions.actions"]._for_xml_id("helpdesk.helpdesk_ticket_action_team")
action['display_name'] = self.name
return action
def _get_action_view_ticket_params(self, is_ticket_closed=False):
""" Get common params for the actions
:param is_ticket_closed: Boolean if True, then we want to see the tickets closed in last 7 days
:returns dict containing the params to update into the action.
"""
domain = [('team_id', 'in', self.ids)]
context = {
'search_default_is_open': not is_ticket_closed,
'default_team_id': self.id,
}
view_mode = 'list,kanban,form,activity,pivot,graph'
if is_ticket_closed:
domain = expression.AND([domain, [
('close_date', '>=', datetime.date.today() - datetime.timedelta(days=6)),
]])
context.update(search_default_closed_on='custom_closed_on_last_7_days')
return {
'domain': domain,
'context': context,
'view_mode': view_mode,
}
def action_view_closed_ticket(self):
action = self.action_view_ticket()
action_params = self._get_action_view_ticket_params(True)
action.update({
**action_params,
'domain': expression.AND([action_params['domain'], [('stage_id.fold', '=', True)]]),
})
return action
def action_view_success_rate(self):
action = self.action_view_ticket()
action_params = self._get_action_view_ticket_params(True)
action.update(
domain=expression.AND([
action_params['domain'],
[('sla_fail', "!=", True), ('team_id', 'in', self.ids), ('stage_id.fold', '=', True)],
]),
context={
**action_params['context'],
'search_default_sla_success': True,
},
view_mode=action_params['view_mode'],
views=[(False, view) for view in action_params['view_mode'].split(",")],
)
return action
def action_view_customer_satisfaction(self):
action = self._action_view_rating(period='seven_days')
action['context'] = {**self.env.context, **action['context'], 'search_default_my_ratings': False}
return action
def action_view_open_ticket(self):
action = self.action_view_ticket()
action_params = self._get_action_view_ticket_params()
action.update({
'context': action_params['context'],
'domain': action_params['domain'],
})
return action
def action_view_urgent(self):
action = self.action_view_ticket()
action_params = self._get_action_view_ticket_params()
action.update({
'context': {
**action_params['context'],
'search_default_urgent_priority': True,
},
})
return action
def action_view_sla_failed(self):
action = self.action_view_ticket()
action_params = self._get_action_view_ticket_params()
action.update({
'context': {
**action_params['context'],
'search_default_sla_failed': True,
},
'domain': expression.AND([action_params['domain'], [('sla_fail', '=', True)]]),
})
return action
def action_view_rating_today(self):
# call this method of on click "Customer Rating" button on dashbord for today rating of teams tickets
return self.search([('member_ids', 'in', self._uid)])._action_view_rating(period='today', user_id=self._uid)
def action_view_rating_7days(self):
# call this method of on click "Customer Rating" button on dashbord for last 7days rating of teams tickets
return self.search([('member_ids', 'in', self._uid)])._action_view_rating(period='seven_days', user_id=self._uid)
def action_view_team_rating(self):
self.ensure_one()
action = self._action_view_rating()
# Before this changes if some tickets are archived in the helpdesk team, we count the ratings of them + the active tickets.
# Do we really want to count the ratings of the archived tickets?
ratings = self.env['rating.rating'].search(action['domain'])
if len(ratings) == 1:
action.update({
'view_mode': 'form',
'views': [(False, 'form')],
'res_id': ratings.id
})
else:
action['context'] = {'search_default_filter_create_date': 'custom_create_date_last_30_days'}
return action
def action_view_open_ticket_view(self):
action = self.action_view_ticket()
action.update({
'display_name': _("Tickets"),
'domain': [('team_id', '=', self.id), ('stage_id.fold', '=', False)],
})
return action
def action_view_sla_policy(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("helpdesk.helpdesk_sla_action")
if self.sla_policy_count == 1:
action.update({
'view_mode': 'form',
'res_id': self.env['helpdesk.sla'].search([('team_id', '=', self.id)], limit=1).id,
'views': [(False, 'form')],
})
action.update({
'context': {'default_team_id': self.id},
'domain': [('team_id', '=', self.id)],
})
return action
@api.model
def _get_working_user_interval(self, start_dt, end_dt, calendar, users, compute_leaves=True):
# This method is intended to be overridden in hr_holidays in order to take non-validated leaves into account
return calendar._work_intervals_batch(
start_dt,
end_dt,
resources=users.resource_ids,
compute_leaves=compute_leaves
)
def _get_working_users_per_first_working_day(self):
tz = timezone(self._context.get('tz') or 'UTC')
start_dt = fields.Datetime.now().astimezone(tz)
end_dt = start_dt + relativedelta.relativedelta(days=7, hour=23, minute=59, second=59)
workers_per_first_working_date = defaultdict(list)
members_per_calendar = defaultdict(lambda: self.env['res.users'])
company_calendar = self.env.company.resource_calendar_id
for member in self.member_ids:
calendar = member.resource_calendar_id or company_calendar
members_per_calendar[calendar] |= member
for calendar, users in members_per_calendar.items():
work_intervals_per_resource = self._get_working_user_interval(start_dt, end_dt, calendar, users)
for user in users:
for resource_id in user.resource_ids.ids:
intervals = work_intervals_per_resource[resource_id]
if intervals:
# select the start_date of the first interval to get the first working day for this user
workers_per_first_working_date[(intervals._items)[0][0].date()].append(user.id)
break
# if the user doesn't linked to any employee then add according to company calendar
if user.id and not user.resource_ids:
intervals = work_intervals_per_resource[False]
if intervals:
workers_per_first_working_date[(intervals._items)[0][0].date()].append(user.id)
return [value for key, value in sorted(workers_per_first_working_date.items())]
def _determine_user_to_assign(self):
""" Get a dict with the user (per team) that should be assign to the nearly created ticket according to the team policy
:returns a mapping of team identifier with the "to assign" user (maybe an empty record).
:rtype : dict (key=team_id, value=record of res.users)
"""
team_without_manually = self.filtered(lambda x: x.assign_method in ['randomly', 'balanced'] and x.auto_assignment)
users_per_working_days = team_without_manually._get_working_users_per_first_working_day()
result = dict.fromkeys(self.ids, self.env['res.users'])
for team in team_without_manually:
if not team.member_ids:
continue
member_ids = team.member_ids.ids # By default, all members of the team
for user_ids in users_per_working_days:
if any(user_id in team.member_ids.ids for user_id in user_ids):
# filter members in team to get the ones working in the nearest date of today.
member_ids = [user_id for user_id in user_ids if user_id in self.member_ids.ids]
break
if team.assign_method == 'randomly': # randomly means new tickets get uniformly distributed
last_assigned_user = self.env['helpdesk.ticket'].search([('team_id', '=', team.id), ('user_id', '!=', False)], order='create_date desc, id desc', limit=1).user_id
index = 0
if last_assigned_user and last_assigned_user.id in member_ids:
previous_index = member_ids.index(last_assigned_user.id)
index = (previous_index + 1) % len(member_ids)
result[team.id] = self.env['res.users'].browse(member_ids[index])
elif team.assign_method == 'balanced': # find the member with the least open ticket
ticket_count_data = self.env['helpdesk.ticket']._read_group([('stage_id.fold', '=', False), ('user_id', 'in', member_ids), ('team_id', '=', team.id)], ['user_id'], ['__count'])
open_ticket_per_user_map = dict.fromkeys(member_ids, 0) # dict: user_id -> open ticket count
open_ticket_per_user_map.update((user.id, count) for user, count in ticket_count_data)
result[team.id] = self.env['res.users'].browse(min(open_ticket_per_user_map, key=open_ticket_per_user_map.get))
return result
def _determine_stage(self):
""" Get a dict with the stage (per team) that should be set as first to a created ticket
:returns a mapping of team identifier with the stage (maybe an empty record).
:rtype : dict (key=team_id, value=record of helpdesk.stage)
"""
result = dict.fromkeys(self.ids, self.env['helpdesk.stage'])
for team in self:
result[team.id] = self.env['helpdesk.stage'].search([('team_ids', 'in', team.id)], order='sequence', limit=1)
return result
def _get_closing_stage(self):
"""
Return the first closing kanban stage or the last stage of the pipe if none
"""
closed_stage = self.stage_ids.filtered(lambda stage: stage.fold)
if not closed_stage:
closed_stage = self.stage_ids[-1]
return closed_stage
def _cron_auto_close_tickets(self):
teams = self.env['helpdesk.team'].search_read(
domain=[
('auto_close_ticket', '=', True),
('auto_close_day', '>', 0),
('to_stage_id', '!=', False)],
fields=[
'id',
'auto_close_day',
'from_stage_ids',
'to_stage_id']
)
teams_dict = defaultdict(dict) # key: team_id, values: the remaining result of the search_group
today = fields.datetime.today()
for team in teams:
# Compute the threshold_date
team['threshold_date'] = today - relativedelta.relativedelta(days=team['auto_close_day'])
teams_dict[team['id']] = team
tickets_domain = [('stage_id.fold', '=', False), ('team_id', 'in', list(teams_dict.keys()))]
tickets = self.env['helpdesk.ticket'].search(tickets_domain)
def is_inactive_ticket(ticket):
team = teams_dict[ticket.team_id.id]
is_write_date_ok = ticket.write_date <= team['threshold_date']
if team['from_stage_ids']:
is_stage_ok = ticket.stage_id.id in team['from_stage_ids']
else:
is_stage_ok = not ticket.stage_id.fold
return is_write_date_ok and is_stage_ok
inactive_tickets = tickets.filtered(is_inactive_ticket)
for ticket in inactive_tickets:
# to_stage_id is mandatory in the view but not in the model so it is better to test it.
if teams_dict[ticket.team_id.id]['to_stage_id']:
ticket.write({'stage_id': teams_dict[ticket.team_id.id]['to_stage_id'][0]})
def action_view_helpdesk_rating(self):
action = self.env['ir.actions.act_window']._for_xml_id('helpdesk.rating_rating_action_helpdesk')
ticket_ids = self.env['helpdesk.ticket']._search([('team_id.company_id', 'in', self._context.get('allowed_company_ids'))])
action['domain'] = expression.AND([
ast.literal_eval(action.get('domain', '[]')),
[('res_id', 'in', list(ticket_ids))],
])
return action