odoo18/addons_extensions/hr_payroll/models/hr_payslip.py

1828 lines
87 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
import random
import math
import pytz
from collections import defaultdict, Counter
from datetime import date, datetime, time
from dateutil.relativedelta import relativedelta
from functools import reduce
from odoo import api, Command, fields, models, _
from odoo.exceptions import UserError, ValidationError
from odoo.tools import float_round, date_utils, convert_file, format_amount
from odoo.tools.float_utils import float_compare
from odoo.tools.misc import format_date
from odoo.tools.safe_eval import safe_eval, datetime as safe_eval_datetime, dateutil as safe_eval_dateutil
_logger = logging.getLogger(__name__)
class DefaultDictPayroll(defaultdict):
def get(self, key, default=None):
if key not in self and default is not None:
self[key] = default
return self[key]
class HrPayslip(models.Model):
_name = 'hr.payslip'
_description = 'Pay Slip'
_inherit = ['mail.thread.cc', 'mail.thread.main.attachment', 'mail.activity.mixin']
_order = 'date_to desc'
struct_id = fields.Many2one(
'hr.payroll.structure', string='Structure', precompute=True,
compute='_compute_struct_id', store=True, readonly=False, tracking=True,
help='Defines the rules that have to be applied to this payslip, according '
'to the contract chosen. If the contract is empty, this field isn\'t '
'mandatory anymore and all the valid rules of the structures '
'of the employee\'s contracts will be applied.')
structure_code = fields.Char(related="struct_id.code")
struct_type_id = fields.Many2one('hr.payroll.structure.type', related='struct_id.type_id')
wage_type = fields.Selection(related='contract_id.wage_type')
name = fields.Char(
string='Payslip Name', required=True,
compute='_compute_name', store=True, readonly=False)
number = fields.Char(
string='Reference', copy=False)
employee_id = fields.Many2one(
'hr.employee', string='Employee', required=True,
domain="['|', ('company_id', '=', False), ('company_id', '=', company_id), '|', ('active', '=', True), ('active', '=', False)]")
image_128 = fields.Image(related='employee_id.image_128')
image_1920 = fields.Image(related='employee_id.image_1920')
avatar_128 = fields.Image(related='employee_id.avatar_128')
avatar_1920 = fields.Image(related='employee_id.avatar_1920')
department_id = fields.Many2one('hr.department', string='Department', related='employee_id.department_id', readonly=True, store=True)
job_id = fields.Many2one('hr.job', string='Job Position', related='employee_id.job_id', readonly=True, store=True)
date_from = fields.Date(
string='From', readonly=False, required=True, tracking=True,
compute="_compute_date_from", store=True, precompute=True)
date_to = fields.Date(
string='To', readonly=False, required=True, tracking=True,
compute="_compute_date_to", store=True, precompute=True)
state = fields.Selection([
('draft', 'Draft'),
('verify', 'Waiting'),
('done', 'Done'),
('paid', 'Paid'),
('cancel', 'Canceled')],
string='Status', index=True, readonly=True, copy=False,
default='draft', tracking=True,
help="""* When the payslip is created the status is \'Draft\'
\n* If the payslip is under verification, the status is \'Waiting\'.
\n* If the payslip is confirmed then status is set to \'Done\'.
\n* When the user cancels a payslip, the status is \'Canceled\'.""")
line_ids = fields.One2many(
'hr.payslip.line', 'slip_id', string='Payslip Lines',
compute='_compute_line_ids', store=True, readonly=False, copy=True)
company_id = fields.Many2one(
'res.company', string='Company', copy=False, required=True,
compute='_compute_company_id', store=True, readonly=False,
default=lambda self: self.env.company)
country_id = fields.Many2one(
'res.country', string='Country',
related='company_id.country_id', readonly=True
)
country_code = fields.Char(related='country_id.code', depends=['country_id'], readonly=True)
worked_days_line_ids = fields.One2many(
'hr.payslip.worked_days', 'payslip_id', string='Payslip Worked Days', copy=True,
compute='_compute_worked_days_line_ids', store=True, readonly=False)
input_line_ids = fields.One2many(
'hr.payslip.input', 'payslip_id', string='Payslip Inputs',
compute='_compute_input_line_ids', store=True,
readonly=False)
paid = fields.Boolean(
string='Made Payment Order? ', copy=False)
paid_date = fields.Date(string="Close Date", help="The date on which the payment is made to the employee.")
note = fields.Text(string='Internal Note')
contract_domain_ids = fields.Many2many('hr.contract', compute='_compute_contract_domain_ids')
contract_id = fields.Many2one(
'hr.contract', string='Contract', precompute=True,
domain="[('id', 'in', contract_domain_ids)]", tracking=True,
compute='_compute_contract_id', store=True, readonly=False)
credit_note = fields.Boolean(
string='Credit Note',
help="Indicates this payslip has a refund of another")
has_refund_slip = fields.Boolean(compute='_compute_has_refund_slip')
payslip_run_id = fields.Many2one(
'hr.payslip.run', string='Batch Name',
copy=False, ondelete='cascade', tracking=True,
domain="[('company_id', '=', company_id)]")
sum_worked_hours = fields.Float(compute='_compute_worked_hours', store=True, help='Total hours of attendance and time off (paid or not)')
compute_date = fields.Date('Computed On')
basic_wage = fields.Monetary(compute='_compute_basic_net', store=True)
gross_wage = fields.Monetary(compute='_compute_basic_net', store=True)
net_wage = fields.Monetary(compute='_compute_basic_net', store=True)
currency_id = fields.Many2one(related='contract_id.currency_id')
warning_message = fields.Char(compute='_compute_warning_message', store=True, readonly=True)
is_wrong_duration = fields.Boolean(compute='_compute_is_wrong_duration', compute_sudo=True)
is_regular = fields.Boolean(compute='_compute_is_regular')
has_negative_net_to_report = fields.Boolean()
negative_net_to_report_display = fields.Boolean(compute='_compute_negative_net_to_report_display')
negative_net_to_report_message = fields.Char(compute='_compute_negative_net_to_report_display')
negative_net_to_report_amount = fields.Float(compute='_compute_negative_net_to_report_display')
is_superuser = fields.Boolean(compute="_compute_is_superuser")
edited = fields.Boolean()
queued_for_pdf = fields.Boolean(default=False)
salary_attachment_ids = fields.Many2many(
'hr.salary.attachment',
relation='hr_payslip_hr_salary_attachment_rel',
string='Salary Attachments',
compute='_compute_salary_attachment_ids',
store=True,
readonly=False,
)
salary_attachment_count = fields.Integer('Salary Attachment count', compute='_compute_salary_attachment_count')
use_worked_day_lines = fields.Boolean(related="struct_id.use_worked_day_lines")
payment_report = fields.Binary(
string='Payment Report',
help="Export .csv file related to this payslip",
readonly=True)
payment_report_filename = fields.Char(readonly=True)
payment_report_date = fields.Date(readonly=True)
ytd_computation = fields.Boolean(related='struct_id.ytd_computation')
def _get_schedule_period_start(self):
schedule = self.contract_id.schedule_pay or self.contract_id.structure_type_id.default_schedule_pay
today = date.today()
week_start = self.env["res.lang"]._get_data(code=self.env.user.lang).week_start
date_from = today
if schedule == 'quarterly':
current_year_quarter = math.ceil(today.month / 3)
date_from = today.replace(day=1, month=(current_year_quarter - 1) * 3 + 1)
elif schedule == 'semi-annually':
is_second_half = math.floor((today.month - 1) / 6)
date_from = today.replace(day=1, month=7) if is_second_half else today.replace(day=1, month=1)
elif schedule == 'annually':
date_from = today.replace(day=1, month=1)
elif schedule == 'weekly':
week_day = today.weekday()
date_from = today + relativedelta(days=-week_day)
elif schedule == 'bi-weekly':
week = int(today.strftime("%U") if week_start == '7' else today.strftime("%W"))
week_day = today.weekday()
is_second_week = week % 2 == 0
date_from = today + relativedelta(days=-week_day - 7 * int(is_second_week))
elif schedule == 'bi-monthly':
current_year_slice = math.ceil(today.month / 2)
date_from = today.replace(day=1, month=(current_year_slice - 1) * 2 + 1)
else: # if not handled, put the monthly behaviour
date_from = today.replace(day=1)
if self.contract_id and date_from < self.contract_id.date_start:
date_from = self.contract_id.date_start
return date_from
@api.depends('contract_id', 'struct_id')
def _compute_date_from(self):
for payslip in self:
if self.env.context.get('default_date_from'):
payslip.date_from = self.env.context.get('default_date_from')
else:
payslip.date_from = payslip._get_schedule_period_start()
def _get_schedule_timedelta(self):
self.ensure_one()
schedule = self.contract_id.schedule_pay or self.contract_id.structure_type_id.default_schedule_pay
if schedule == 'quarterly':
timedelta = relativedelta(months=3, days=-1)
elif schedule == 'semi-annually':
timedelta = relativedelta(months=6, days=-1)
elif schedule == 'annually':
timedelta = relativedelta(years=1, days=-1)
elif schedule == 'weekly':
timedelta = relativedelta(days=6)
elif schedule == 'bi-weekly':
timedelta = relativedelta(days=13)
elif schedule == 'semi-monthly':
timedelta = relativedelta(day=15 if self.date_from.day < 15 else 31)
elif schedule == 'bi-monthly':
timedelta = relativedelta(months=2, days=-1)
elif schedule == 'daily':
timedelta = relativedelta(days=0)
else: # if not handled, put the monthly behaviour
timedelta = relativedelta(months=1, days=-1)
return timedelta
@api.depends('date_from', 'contract_id', 'struct_id')
def _compute_date_to(self):
for payslip in self:
if self.env.context.get('default_date_to'):
payslip.date_to = self.env.context.get('default_date_to')
else:
payslip.date_to = payslip.date_from and payslip.date_from + payslip._get_schedule_timedelta()
if payslip.contract_id and payslip.contract_id.date_end\
and payslip.date_from >= payslip.contract_id.date_start\
and payslip.date_from < payslip.contract_id.date_end\
and payslip.date_to > payslip.contract_id.date_end:
payslip.date_to = payslip.contract_id.date_end
@api.depends('company_id', 'employee_id')
def _compute_contract_domain_ids(self):
for payslip in self:
payslip.contract_domain_ids = self.env['hr.contract'].search([
('company_id', '=', payslip.company_id.id),
('employee_id', '=', payslip.employee_id.id),
('state', 'in', ['open', 'close']),
])
@api.depends('employee_id', 'contract_id', 'struct_id', 'date_from', 'date_to', 'struct_id')
def _compute_input_line_ids(self):
attachment_types = self._get_attachment_types()
attachment_type_ids = [f.id for f in attachment_types.values()]
for slip in self:
if not slip.employee_id or not slip.employee_id.salary_attachment_ids or not slip.struct_id:
lines_to_remove = slip.input_line_ids.filtered(lambda x: x.input_type_id.id in attachment_type_ids)
slip.update({'input_line_ids': [Command.unlink(line.id) for line in lines_to_remove]})
if slip.employee_id.salary_attachment_ids and slip.date_to:
lines_to_remove = slip.input_line_ids.filtered(lambda x: x.input_type_id.id in attachment_type_ids)
input_line_vals = [Command.unlink(line.id) for line in lines_to_remove]
valid_attachments = slip.employee_id.salary_attachment_ids.filtered(
lambda a: a.state == 'open'
and a.date_start <= slip.date_to
and (not a.date_end or a.date_end >= slip.date_from)
)
# Only take deduction types present in structure
deduction_types = list(set(valid_attachments.other_input_type_id.mapped('code')))
for deduction_type in deduction_types:
attachments = valid_attachments.filtered(lambda a: a.other_input_type_id.code == deduction_type)
amount = attachments._get_active_amount()
name = ', '.join(attachments.mapped('description'))
input_type_id = attachment_types[deduction_type].id
input_line_vals.append(Command.create({
'name': name,
'amount': amount if not slip.credit_note else -amount,
'input_type_id': input_type_id,
}))
slip.update({'input_line_ids': input_line_vals})
@api.depends('input_line_ids.input_type_id', 'input_line_ids')
def _compute_salary_attachment_ids(self):
attachment_types = self._get_attachment_types()
for slip in self:
if not slip.input_line_ids and not slip.salary_attachment_ids:
continue
attachments = self.env['hr.salary.attachment']
if slip.employee_id and slip.input_line_ids and slip.date_to:
input_line_type_ids = slip.input_line_ids.mapped('input_type_id.id')
deduction_types = [f for f in attachment_types if attachment_types[f].id in input_line_type_ids]
attachments = slip.employee_id.salary_attachment_ids.filtered(
lambda a: (
a.state == 'open'
and a.other_input_type_id.code in deduction_types
and a.date_start <= slip.date_to
)
)
slip.salary_attachment_ids = attachments
@api.depends('salary_attachment_ids')
def _compute_salary_attachment_count(self):
for slip in self:
slip.salary_attachment_count = len(slip.salary_attachment_ids)
@api.depends('employee_id', 'state')
def _compute_negative_net_to_report_display(self):
activity_type = self.env.ref('hr_payroll.mail_activity_data_hr_payslip_negative_net')
for payslip in self:
if payslip.state in ['draft', 'verify']:
payslips_to_report = self.env['hr.payslip'].search([
('has_negative_net_to_report', '=', True),
('employee_id', 'in', payslip.employee_id.ids),
('credit_note', '=', False),
])
payslip.negative_net_to_report_display = payslips_to_report
payslip.negative_net_to_report_amount = payslips_to_report._get_line_values(['NET'], compute_sum=True)['NET']['sum']['total']
payslip.negative_net_to_report_message = _(
'Note: There are previous payslips with a negative amount for a total of %s to report.',
round(payslip.negative_net_to_report_amount, 2))
if payslips_to_report and payslip.state == 'verify' and payslip.contract_id and not payslip.activity_ids.filtered(lambda a: a.activity_type_id == activity_type):
payslip.activity_schedule(
'hr_payroll.mail_activity_data_hr_payslip_negative_net',
summary=_('Previous Negative Payslip to Report'),
note=_('At least one previous negative net could be reported on this payslip for %s',
payslip.employee_id._get_html_link()),
user_id=payslip.contract_id.hr_responsible_id.id or self.env.ref('base.user_admin').id)
else:
payslip.negative_net_to_report_display = False
payslip.negative_net_to_report_amount = False
payslip.negative_net_to_report_message = False
def _get_negative_net_input_type(self):
self.ensure_one()
return self.env.ref('hr_payroll.input_deduction')
def action_report_negative_amount(self):
self.ensure_one()
deduction_input_type = self._get_negative_net_input_type()
deduction_input_line = self.input_line_ids.filtered(lambda l: l.input_type_id == deduction_input_type)
if deduction_input_line:
deduction_input_line.amount += abs(self.negative_net_to_report_amount)
else:
self.write({'input_line_ids': [(0, 0, {
'input_type_id': deduction_input_type.id,
'amount': abs(self.negative_net_to_report_amount),
})]})
self.compute_sheet()
self.env['hr.payslip'].search([
('has_negative_net_to_report', '=', True),
('employee_id', '=', self.employee_id.id),
('credit_note', '=', False),
]).write({'has_negative_net_to_report': False})
self.activity_feedback(['hr_payroll.mail_activity_data_hr_payslip_negative_net'])
def _compute_is_regular(self):
for payslip in self:
payslip.is_regular = payslip.struct_id.type_id.default_struct_id == payslip.struct_id
def _is_invalid(self):
self.ensure_one()
if self.state not in ['done', 'paid']:
return _("This payslip is not validated. This is not a legal document.")
return False
@api.depends('worked_days_line_ids', 'input_line_ids')
def _compute_line_ids(self):
if not self.env.context.get("payslip_no_recompute"):
return
payslips = self.filtered(lambda p: p.line_ids and p.state in ['draft', 'verify'])
payslips.line_ids.unlink()
self.env['hr.payslip.line'].create(payslips._get_payslip_lines())
@api.depends('line_ids.total')
def _compute_basic_net(self):
line_values = (self._origin)._get_line_values(['BASIC', 'GROSS', 'NET'])
for payslip in self:
payslip.basic_wage = line_values['BASIC'][payslip._origin.id]['total']
payslip.gross_wage = line_values['GROSS'][payslip._origin.id]['total']
payslip.net_wage = line_values['NET'][payslip._origin.id]['total']
@api.depends('worked_days_line_ids.number_of_hours', 'worked_days_line_ids.is_paid', 'worked_days_line_ids.is_credit_time')
def _compute_worked_hours(self):
for payslip in self:
payslip.sum_worked_hours = sum([line.number_of_hours for line in payslip.worked_days_line_ids if not line.is_credit_time])
def _compute_is_superuser(self):
self.is_superuser = self.env.user._is_superuser() and self.env.user.has_group('base.group_no_one')
def _compute_has_refund_slip(self):
# This field is only used to know whether we need a confirm on refund or not
# It doesn't have to work in batch and we try not to search if not necessary
for payslip in self:
if not payslip.credit_note and payslip.state in ('done', 'paid') and self.search_count([
('employee_id', '=', payslip.employee_id.id),
('date_from', '=', payslip.date_from),
('date_to', '=', payslip.date_to),
('contract_id', '=', payslip.contract_id.id),
('struct_id', '=', payslip.struct_id.id),
('credit_note', '=', True),
('state', '!=', 'cancel'),
]):
payslip.has_refund_slip = True
else:
payslip.has_refund_slip = False
@api.constrains('date_from', 'date_to')
def _check_dates(self):
if any(payslip.date_from > payslip.date_to for payslip in self):
raise ValidationError(_("Payslip 'Date From' must be earlier than 'Date To'."))
def _record_attachment_payment(self, attachments, slip_lines):
self.ensure_one()
sign = -1 if self.credit_note else 1
amount = sum(sl.total for sl in slip_lines) if not attachments.other_input_type_id.is_quantity else sum(sl.quantity for sl in slip_lines)
attachments.record_payment(sign * abs(amount))
def write(self, vals):
res = super().write(vals)
if 'state' in vals and vals['state'] == 'paid':
# Register payment in Salary Attachments
# NOTE: Since we combine multiple attachments on one input line, it's not possible to compute
# how much per attachment needs to be taken record_payment will consume monthly payments (child_support) before other attachments
attachment_types = self._get_attachment_types()
for slip in self.filtered(lambda r: r.salary_attachment_ids):
for deduction_type, input_type_id in attachment_types.items():
attachments = slip.salary_attachment_ids.filtered(lambda r: r.other_input_type_id.code == deduction_type)
input_lines = slip.input_line_ids.filtered(lambda r: r.input_type_id.id == input_type_id.id)
# Use the amount from the computed value in the payslip lines not the input
salary_lines = slip.line_ids.filtered(lambda r: r.code in input_lines.mapped('code'))
if not attachments or not salary_lines:
continue
slip._record_attachment_payment(attachments, salary_lines)
return res
def action_payslip_draft(self):
return self.write({'state': 'draft'})
def _get_pdf_reports(self):
default_report = self.env.ref('hr_payroll.action_report_payslip')
result = defaultdict(lambda: self.env['hr.payslip'])
for payslip in self:
if not payslip.struct_id or not payslip.struct_id.report_id:
result[default_report] |= payslip
else:
result[payslip.struct_id.report_id] |= payslip
return result
@api.model
def _get_email_template(self):
return self.env.ref(
'hr_payroll.mail_template_new_payslip', raise_if_not_found=False
)
def _generate_pdf(self):
mapped_reports = self._get_pdf_reports()
attachments_vals_list = []
generic_name = _("Payslip")
template = self._get_email_template()
for report, payslips in mapped_reports.items():
for payslip in payslips:
pdf_content, dummy = self.env['ir.actions.report'].sudo().with_context(lang=payslip.employee_id.lang or self.env.lang)._render_qweb_pdf(report, payslip.id)
if report.print_report_name:
pdf_name = safe_eval(report.print_report_name, {'object': payslip})
else:
pdf_name = generic_name
attachments_vals_list.append({
'name': pdf_name,
'type': 'binary',
'raw': pdf_content,
'res_model': payslip._name,
'res_id': payslip.id
})
# Send email to employees
if template:
template.send_mail(payslip.id, email_layout_xmlid='mail.mail_notification_light')
self.env['ir.attachment'].sudo().create(attachments_vals_list)
def _filter_out_of_contracts_payslips(self):
return self.filtered(lambda p: p.contract_id and (p.contract_id.date_start > p.date_to or (p.contract_id.date_end and p.contract_id.date_end < p.date_from)))
def action_payslip_done(self):
invalid_payslips = self._filter_out_of_contracts_payslips()
if invalid_payslips:
raise ValidationError(_('The following employees have a contract outside of the payslip period:\n%s', '\n'.join(invalid_payslips.mapped('employee_id.name'))))
if any(slip.contract_id.state == 'cancel' for slip in self):
raise ValidationError(_('You cannot validate a payslip on which the contract is cancelled'))
if any(slip.state == 'cancel' for slip in self):
raise ValidationError(_("You can't validate a cancelled payslip."))
self.write({'state' : 'done'})
line_values = self._get_line_values(['NET'])
self.filtered(lambda p: not p.credit_note and line_values['NET'][p.id]['total'] < 0).write({'has_negative_net_to_report': True})
self.mapped('payslip_run_id').action_close()
# Validate work entries for regular payslips (exclude end of year bonus, ...)
regular_payslips = self.filtered(lambda p: p.struct_id.type_id.default_struct_id == p.struct_id)
work_entries = self.env['hr.work.entry']
for regular_payslip in regular_payslips:
work_entries |= self.env['hr.work.entry'].search([
('date_start', '<=', regular_payslip.date_to),
('date_stop', '>=', regular_payslip.date_from),
('employee_id', '=', regular_payslip.employee_id.id),
])
if work_entries:
work_entries.action_validate()
if self.env.context.get('payslip_generate_pdf'):
if self.env.context.get('payslip_generate_pdf_direct'):
self._generate_pdf()
else:
self.write({'queued_for_pdf': True})
payslip_cron = self.env.ref('hr_payroll.ir_cron_generate_payslip_pdfs', raise_if_not_found=False)
if payslip_cron:
payslip_cron._trigger()
def action_payslip_cancel(self):
if not self.env.user._is_system() and self.filtered(lambda slip: slip.state == 'done'):
raise UserError(_("Cannot cancel a payslip that is done."))
self.write({'state': 'cancel'})
self.mapped('payslip_run_id').action_close()
def action_payslip_paid(self):
if any(slip.state not in ['done', 'paid'] for slip in self):
raise UserError(_('Cannot mark payslip as paid if not confirmed.'))
self.filtered(lambda p: p.state != 'paid').write({
'state': 'paid',
'paid_date': fields.Date.today(),
})
def action_payslip_payment_report(self, export_format='csv'):
self.ensure_one()
if len(self.payslip_run_id) > 1:
raise UserError(_('The selected payslips should be linked to the same batch'))
self.env['hr.payroll.payment.report.wizard'].create({
'payslip_ids': self.ids,
'payslip_run_id': self.payslip_run_id.id,
'export_format': export_format
}).generate_payment_report()
def action_open_work_entries(self):
self.ensure_one()
return self.employee_id.action_open_work_entries(initial_date=self.date_from)
def action_open_salary_attachments(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'name': _('Salary Attachments'),
'res_model': 'hr.salary.attachment',
'view_mode': 'list,form',
'domain': [('id', 'in', self.salary_attachment_ids.ids)],
}
def refund_sheet(self):
copied_payslips = self.env['hr.payslip']
for payslip in self:
copied_payslip = payslip.copy({
'credit_note': True,
'name': _('Refund: %(payslip)s', payslip=payslip.name),
'edited': True,
'state': 'verify',
})
for wd in copied_payslip.worked_days_line_ids:
wd.number_of_hours = -wd.number_of_hours
wd.number_of_days = -wd.number_of_days
wd.amount = -wd.amount
for line in copied_payslip.line_ids:
line.amount = -line.amount
line.total = -line.total
copied_payslips |= copied_payslip
formview_ref = self.env.ref('hr_payroll.view_hr_payslip_form', False)
treeview_ref = self.env.ref('hr_payroll.view_hr_payslip_tree', False)
return {
'name': ("Refund Payslip"),
'view_mode': 'list, form',
'view_id': False,
'res_model': 'hr.payslip',
'type': 'ir.actions.act_window',
'target': 'current',
'domain': [('id', 'in', copied_payslips.ids)],
'views': [(treeview_ref and treeview_ref.id or False, 'list'), (formview_ref and formview_ref.id or False, 'form')],
'context': {}
}
@api.ondelete(at_uninstall=False)
def _unlink_if_draft_or_cancel(self):
if any(payslip.state not in ('draft', 'cancel') for payslip in self):
raise UserError(_('You cannot delete a payslip which is not draft or cancelled!'))
def compute_sheet(self):
payslips = self.filtered(lambda slip: slip.state in ['draft', 'verify'])
# delete old payslip lines
payslips.line_ids.unlink()
# this guarantees consistent results
self.env.flush_all()
today = fields.Date.today()
for payslip in payslips:
number = payslip.number or self.env['ir.sequence'].next_by_code('salary.slip')
payslip.write({
'number': number,
'state': 'verify',
'compute_date': today
})
self.env['hr.payslip.line'].create(payslips._get_payslip_lines())
if any(payslips.mapped('ytd_computation')):
self._compute_worked_days_ytd()
return True
def action_refresh_from_work_entries(self):
# Refresh the whole payslip in case the HR has modified some work entries
# after the payslip generation
if any(p.state not in ['draft', 'verify'] for p in self):
raise UserError(_('The payslips should be in Draft or Waiting state.'))
payslips = self.filtered(lambda p: not p.edited)
payslips.mapped('worked_days_line_ids').unlink()
payslips.mapped('line_ids').unlink()
payslips._compute_worked_days_line_ids()
payslips.compute_sheet()
def _round_days(self, work_entry_type, days):
if work_entry_type.round_days != 'NO':
precision_rounding = 0.5 if work_entry_type.round_days == "HALF" else 1
day_rounded = float_round(days, precision_rounding=precision_rounding, rounding_method=work_entry_type.round_days_type)
return day_rounded
return days
@api.model
def _get_attachment_types(self):
input_types = self.env['hr.payslip.input.type'].search([('available_in_attachments', '=', True)])
return {input_type.code: input_type for input_type in input_types}
def _get_worked_day_lines_hours_per_day(self):
self.ensure_one()
return self.contract_id.resource_calendar_id.hours_per_day
def _get_out_of_contract_calendar(self):
self.ensure_one()
if self.contract_id.time_credit:
return self.contract_id.standard_calendar_id
return self.contract_id.resource_calendar_id
def _get_worked_day_lines_values(self, domain=None):
self.ensure_one()
res = []
hours_per_day = self._get_worked_day_lines_hours_per_day()
work_hours = self.contract_id.get_work_hours(self.date_from, self.date_to, domain=domain)
work_hours_ordered = sorted(work_hours.items(), key=lambda x: x[1])
biggest_work = work_hours_ordered[-1][0] if work_hours_ordered else 0
add_days_rounding = 0
for work_entry_type_id, hours in work_hours_ordered:
work_entry_type = self.env['hr.work.entry.type'].browse(work_entry_type_id)
days = round(hours / hours_per_day, 5) if hours_per_day else 0
if work_entry_type_id == biggest_work:
days += add_days_rounding
day_rounded = self._round_days(work_entry_type, days)
add_days_rounding += (days - day_rounded)
attendance_line = {
'sequence': work_entry_type.sequence,
'work_entry_type_id': work_entry_type_id,
'number_of_days': day_rounded,
'number_of_hours': hours,
}
res.append(attendance_line)
# Sort by Work Entry Type sequence
work_entry_type = self.env['hr.work.entry.type']
return sorted(res, key=lambda d: work_entry_type.browse(d['work_entry_type_id']).sequence)
def _get_worked_day_lines(self, domain=None, check_out_of_contract=True):
"""
:returns: a list of dict containing the worked days values that should be applied for the given payslip
"""
res = []
# fill only if the contract as a working schedule linked
self.ensure_one()
contract = self.contract_id
if contract.resource_calendar_id:
res = self._get_worked_day_lines_values(domain=domain)
if not check_out_of_contract:
return res
# If the contract doesn't cover the whole month, create
# worked_days lines to adapt the wage accordingly
out_days, out_hours = 0, 0
reference_calendar = self._get_out_of_contract_calendar()
if self.date_from < contract.date_start:
start = fields.Datetime.to_datetime(self.date_from)
stop = fields.Datetime.to_datetime(contract.date_start) + relativedelta(days=-1, hour=23, minute=59)
out_time = reference_calendar.get_work_duration_data(start, stop, compute_leaves=False, domain=['|', ('work_entry_type_id', '=', False), ('work_entry_type_id.is_leave', '=', False)])
out_days += out_time['days']
out_hours += out_time['hours']
if contract.date_end and contract.date_end < self.date_to:
start = fields.Datetime.to_datetime(contract.date_end) + relativedelta(days=1)
stop = fields.Datetime.to_datetime(self.date_to) + relativedelta(hour=23, minute=59)
out_time = reference_calendar.get_work_duration_data(start, stop, compute_leaves=False, domain=['|', ('work_entry_type_id', '=', False), ('work_entry_type_id.is_leave', '=', False)])
out_days += out_time['days']
out_hours += out_time['hours']
if out_days or out_hours:
work_entry_type = self.env.ref('hr_payroll.hr_work_entry_type_out_of_contract')
res.append({
'sequence': work_entry_type.sequence,
'work_entry_type_id': work_entry_type.id,
'number_of_days': out_days,
'number_of_hours': out_hours,
})
return res
@property
def paid_amount(self):
self.ensure_one()
return self._get_paid_amount()
@property
def is_outside_contract(self):
self.ensure_one()
return self._is_outside_contract_dates()
def _rule_parameter(self, code):
self.ensure_one()
return self.env['hr.rule.parameter']._get_parameter_from_code(code, self.date_to)
def _sum(self, code, from_date, to_date=None):
if to_date is None:
to_date = fields.Date.today()
self.env.cr.execute("""
SELECT sum(pl.total)
FROM hr_payslip as hp, hr_payslip_line as pl
WHERE hp.employee_id = %s
AND hp.state in ('done', 'paid')
AND hp.date_from >= %s
AND hp.date_to <= %s
AND hp.id = pl.slip_id
AND pl.code = %s""", (self.employee_id.id, from_date, to_date, code))
res = self.env.cr.fetchone()
return res and res[0] or 0.0
def _sum_category(self, code, from_date, to_date=None):
self.ensure_one()
if to_date is None:
to_date = fields.Date.today()
self.env['hr.payslip'].flush_model(['employee_id', 'state', 'date_from', 'date_to'])
self.env['hr.payslip.line'].flush_model(['total', 'slip_id', 'salary_rule_id'])
self.env['hr.salary.rule.category'].flush_model(['code'])
self.env.cr.execute("""
SELECT sum(pl.total)
FROM
hr_payslip as hp,
hr_payslip_line as pl,
hr_salary_rule_category as rc,
hr_salary_rule as sr
WHERE hp.employee_id = %s
AND hp.state in ('done', 'paid')
AND hp.date_from >= %s
AND hp.date_to <= %s
AND hp.id = pl.slip_id
AND sr.id = pl.salary_rule_id
AND rc.id = sr.category_id
AND rc.code = %s""", (self.employee_id.id, from_date, to_date, code))
res = self.env.cr.fetchone()
return res and res[0] or 0.0
def _sum_worked_days(self, code, from_date, to_date=None):
self.ensure_one()
if to_date is None:
to_date = fields.Date.today()
query = """
SELECT sum(hwd.amount)
FROM hr_payslip hp, hr_payslip_worked_days hwd, hr_work_entry_type hwet
WHERE hp.state in ('done', 'paid')
AND hp.id = hwd.payslip_id
AND hwet.id = hwd.work_entry_type_id
AND hp.employee_id = %(employee)s
AND hp.date_to <= %(stop)s
AND hwet.code = %(code)s
AND hp.date_from >= %(start)s"""
self.env.cr.execute(query, {
'employee': self.employee_id.id,
'code': code,
'start': from_date,
'stop': to_date})
res = self.env.cr.fetchone()
return res[0] if res else 0.0
def _get_base_local_dict(self):
return {
'float_round': float_round,
'float_compare': float_compare,
'relativedelta': safe_eval_dateutil.relativedelta.relativedelta,
'ceil': math.ceil,
'floor': math.floor,
'UserError': UserError,
'date': safe_eval_datetime.date,
'datetime': safe_eval_datetime.datetime,
'defaultdict': defaultdict,
}
def _get_localdict(self):
self.ensure_one()
# Check for multiple inputs of the same type and keep a copy of
# them because otherwise they are lost when building the dict
input_list = [line.code for line in self.input_line_ids if line.code]
cnt = Counter(input_list)
multi_input_lines = [k for k, v in cnt.items() if v > 1]
same_type_input_lines = {line_code: [line for line in self.input_line_ids if line.code == line_code] for line_code in multi_input_lines}
localdict = {
**self._get_base_local_dict(),
**{
'categories': DefaultDictPayroll(lambda: 0),
'rules': DefaultDictPayroll(lambda: dict(total=0, amount=0, quantity=0)),
'payslip': self,
'worked_days': {line.code: line for line in self.worked_days_line_ids if line.code},
'inputs': {line.code: line for line in self.input_line_ids if line.code},
'employee': self.employee_id,
'contract': self.contract_id,
'result_rules': DefaultDictPayroll(lambda: dict(total=0, amount=0, quantity=0, rate=0)),
'same_type_input_lines': same_type_input_lines,
}
}
return localdict
def _get_rule_name(self, localdict, rule, employee_lang):
if localdict['result_name']:
rule_name = localdict['result_name']
elif rule.amount_select == "input" and rule.amount_other_input_id.code in localdict['inputs'] and localdict['inputs'][rule.amount_other_input_id.code].name:
rule_name = localdict['inputs'][rule.amount_other_input_id.code].name
elif rule.code in ['BASIC', 'GROSS', 'NET', 'DEDUCTION',
'REIMBURSEMENT']: # Generated by default_get (no xmlid)
if rule.code == 'BASIC': # Note: Crappy way to code this, but _(foo) is forbidden. Make a method in master to be overridden, using the structure code
if rule.name == "Double Holiday Pay":
rule_name = _("Double Holiday Pay")
if rule.struct_id.name == "CP200: Employees 13th Month":
rule_name = _("Prorated end-of-year bonus")
else:
rule_name = _('Basic Salary')
elif rule.code == "GROSS":
rule_name = _('Taxable Salary')
elif rule.code == "DEDUCTION":
rule_name = _('Deduction')
elif rule.code == "REIMBURSEMENT":
rule_name = _('Reimbursement')
elif rule.code == 'NET':
rule_name = _('Net Salary')
else:
rule_name = rule.with_context(lang=employee_lang).name
return rule_name
def _get_payslip_line_total(self, amount, quantity, rate, rule):
self.ensure_one()
return amount * quantity * rate / 100.0
def _get_last_ytd_payslips(self):
if not self:
return self
earliest_date_to = min(self.mapped('date_to'))
earliest_ytd_date_to = min(
company.get_last_ytd_reset_date(earliest_date_to) for company in self.company_id
)
ytd_payslips_grouped = self.env['hr.payslip']._read_group(
domain=[
('employee_id', 'in', self.employee_id.ids),
('struct_id', 'in', self.struct_id.ids),
('ytd_computation', '=', True),
('date_to', '>=', earliest_ytd_date_to),
('date_to', '<=', max(self.mapped('date_to'))),
('state', 'in', ['done', 'paid']),
],
groupby=['employee_id', 'struct_id'],
aggregates=['id:recordset']
)
ytd_payslips_sorted = defaultdict(lambda: self.env['hr.payslip'])
for employee_id, struct_id, payslips in ytd_payslips_grouped:
ytd_payslips_sorted[(employee_id, struct_id)] = payslips.sorted(
key=lambda p: p.date_to, reverse=True
)
last_ytd_payslips = defaultdict(lambda: self.env['hr.payslip'])
for payslip in self:
last_payslips = ytd_payslips_sorted[(payslip.employee_id, payslip.struct_id)].filtered(
lambda p: p.date_to <= payslip.date_to
)
if last_payslips and last_payslips[0].date_to >=\
payslip.company_id.get_last_ytd_reset_date(payslip.date_to):
last_ytd_payslips[payslip] = last_payslips[0]
return last_ytd_payslips
def _get_payslip_lines(self):
line_vals = []
if any(self.mapped('ytd_computation')):
last_ytd_payslips = self._get_last_ytd_payslips()
code_set = set(self.struct_id.rule_ids.mapped('code'))
else:
last_ytd_payslips = defaultdict(lambda: self.env['hr.payslip'])
code_set = set()
ytd_payslips = reduce(
lambda ytd_payslips, payslip: ytd_payslips | payslip, last_ytd_payslips.values(),
self.env['hr.payslip']
)
line_values = ytd_payslips._get_line_values(code_set, ['ytd'])
for payslip in self:
if not payslip.contract_id:
raise UserError(_("There's no contract set on payslip %(payslip)s for %(employee)s. Check that there is at least a contract set on the employee form.", payslip=payslip.name, employee=payslip.employee_id.name))
localdict = self.env.context.get('force_payslip_localdict', None)
if localdict is None:
localdict = payslip._get_localdict()
rules_dict = localdict['rules']
result_rules_dict = localdict['result_rules']
blacklisted_rule_ids = self.env.context.get('prevent_payslip_computation_line_ids', [])
result = {}
for rule in sorted(payslip.struct_id.rule_ids, key=lambda x: x.sequence):
if rule.id in blacklisted_rule_ids:
continue
localdict.update({
'result': None,
'result_qty': 1.0,
'result_rate': 100,
'result_name': False
})
if rule._satisfy_condition(localdict):
# Retrieve the line name in the employee's lang
employee_lang = payslip.employee_id.lang or self.env.lang
# This actually has an impact, don't remove this line
if rule.code in localdict['same_type_input_lines']:
for multi_line_rule in localdict['same_type_input_lines'][rule.code]:
localdict['inputs'][rule.code] = multi_line_rule
amount, qty, rate = rule._compute_rule(localdict)
tot_rule = payslip._get_payslip_line_total(amount, qty, rate, rule)
localdict = rule.category_id._sum_salary_rule_category(localdict,
tot_rule)
rule_name = payslip._get_rule_name(localdict, rule, employee_lang)
line_vals.append({
'sequence': rule.sequence,
'code': rule.code,
'name': rule_name,
'salary_rule_id': rule.id,
'contract_id': localdict['contract'].id,
'employee_id': localdict['employee'].id,
'amount': amount,
'quantity': qty,
'rate': rate,
'total': tot_rule,
'slip_id': payslip.id,
'ytd': line_values[rule.code][last_ytd_payslips[payslip].id]
['ytd'] + tot_rule,
})
else:
amount, qty, rate = rule._compute_rule(localdict)
#check if there is already a rule computed with that code
previous_amount = localdict.get(rule.code, 0.0)
#set/overwrite the amount computed for this rule in the localdict
tot_rule = payslip._get_payslip_line_total(amount, qty, rate, rule)
localdict[rule.code] = tot_rule
result_rules_dict[rule.code] = {'total': tot_rule, 'amount': amount, 'quantity': qty, 'rate': rate}
rules_dict[rule.code] = rule
# sum the amount for its salary category
localdict = rule.category_id._sum_salary_rule_category(localdict, tot_rule - previous_amount)
rule_name = payslip._get_rule_name(localdict, rule, employee_lang)
# create/overwrite the rule in the temporary results
result[rule.code] = {
'sequence': rule.sequence,
'code': rule.code,
'name': rule_name,
'salary_rule_id': rule.id,
'contract_id': localdict['contract'].id,
'employee_id': localdict['employee'].id,
'amount': amount,
'quantity': qty,
'rate': rate,
'total': tot_rule,
'slip_id': payslip.id,
'ytd': line_values[rule.code][last_ytd_payslips[payslip].id]
['ytd'] + tot_rule,
}
line_vals += list(result.values())
return line_vals
def _compute_worked_days_ytd(self):
last_ytd_payslips = self._get_last_ytd_payslips()
ytd_payslips = reduce(
lambda ytd_payslips, payslip: ytd_payslips | payslip, last_ytd_payslips.values(),
self.env['hr.payslip']
)
code_set = set(self.worked_days_line_ids.mapped('code'))
worked_days_line_values = ytd_payslips._get_worked_days_line_values(code_set, ['ytd'])
for payslip in self:
for worked_days in payslip.worked_days_line_ids:
worked_days.ytd = worked_days_line_values[worked_days.code][
last_ytd_payslips[payslip].id
]['ytd'] + worked_days.amount
@api.depends('employee_id')
def _compute_company_id(self):
for slip in self.filtered(lambda p: p.employee_id):
slip.company_id = slip.employee_id.company_id
@api.depends('employee_id', 'contract_domain_ids')
def _compute_contract_id(self):
for slip in self:
if slip.contract_id and slip.employee_id == slip.contract_id.employee_id:
continue
slip.contract_id = False
if not slip.employee_id or not slip.contract_domain_ids:
continue
# Add a default contract if not already defined or invalid
contracts = slip.contract_domain_ids.filtered(lambda c: c.state == 'open')
if not contracts:
continue
slip.contract_id = contracts[0]._origin
@api.depends('contract_id')
def _compute_struct_id(self):
for slip in self.filtered(lambda p: not p.struct_id):
slip.struct_id = slip.contract_id.structure_type_id.default_struct_id\
or slip.employee_id.contract_id.structure_type_id.default_struct_id
def _get_period_name(self, cache):
self.ensure_one()
period_name = '%s - %s' % (
self._format_date_cached(cache, self.date_from),
self._format_date_cached(cache, self.date_to))
if self.is_wrong_duration:
return period_name
start_date = self.date_from
end_date = self.date_to
lang = self.employee_id.lang or self.env.user.lang
week_start = self.env["res.lang"]._get_data(code=lang).week_start
schedule = self.contract_id.schedule_pay or self.contract_id.structure_type_id.default_schedule_pay
if schedule == 'monthly':
period_name = self._format_date_cached(cache, start_date, "MMMM Y")
elif schedule == 'quarterly':
current_year_quarter = math.ceil(start_date.month / 3)
period_name = _("Quarter %(quarter)s of %(year)s", quarter=current_year_quarter, year=start_date.year)
elif schedule == 'semi-annually':
year_half = start_date.replace(day=1, month=6)
is_first_half = start_date < year_half
period_name = _("1st semester of %s", start_date.year)\
if is_first_half\
else _("2nd semester of %s", start_date.year)
elif schedule == 'annually':
period_name = start_date.year
elif schedule == 'weekly':
wk_num = start_date.strftime('%U') if week_start == '7' else start_date.strftime('%W')
period_name = _('Week %(week_number)s of %(year)s', week_number=wk_num, year=start_date.year)
elif schedule == 'bi-weekly':
week = int(start_date.strftime("%U") if week_start == '7' else start_date.strftime("%W"))
first_week = week - 1 + week % 2
period_name = _("Weeks %(week)s and %(week1)s of %(year)s",
week=first_week, week1=first_week + 1, year=start_date.year)
elif schedule == 'bi-monthly':
start_date_string = self._format_date_cached(cache, start_date, "MMMM Y")
end_date_string = self._format_date_cached(cache, end_date, "MMMM Y")
period_name = _("%(start_date_string)s and %(end_date_string)s", start_date_string=start_date_string, end_date_string=end_date_string)
return period_name
def _format_date_cached(self, cache, date, date_format=False):
key = (date, date_format)
if key not in cache:
lang = self.employee_id.lang or self.env.user.lang
cache[key] = format_date(env=self.env, value=date, lang_code=lang, date_format=date_format)
return cache[key]
@api.depends('employee_id', 'struct_id', 'date_from', 'date_to')
def _compute_name(self):
formated_date_cache = {}
for slip in self.filtered(lambda p: p.employee_id and p.date_from and p.date_to):
lang = slip.employee_id.lang or self.env.user.lang
context = {'lang': lang}
payslip_name = slip.struct_id.payslip_name or _('Salary Slip')
del context
slip.name = '%(payslip_name)s - %(employee_name)s - %(dates)s' % {
'payslip_name': payslip_name,
'employee_name': slip.employee_id.legal_name,
'dates': slip._get_period_name(formated_date_cache),
}
@api.depends('date_from', 'date_to', 'struct_id')
def _compute_warning_message(self):
for slip in self.filtered(lambda p: p.date_to):
slip.warning_message = False
warnings = []
if slip.contract_id and (slip.date_from < slip.contract_id.date_start
or (slip.contract_id.date_end and slip.date_to > slip.contract_id.date_end)):
warnings.append(_("The period selected does not match the contract validity period."))
if slip.date_to > date_utils.end_of(fields.Date.today(), 'month'):
warnings.append(_(
"Work entries may not be generated for the period from %(start)s to %(end)s.",
start=date_utils.add(date_utils.end_of(fields.Date.today(), 'month'), days=1),
end=slip.date_to,
))
if (slip.contract_id.schedule_pay or slip.contract_id.structure_type_id.default_schedule_pay)\
and slip.date_from + slip._get_schedule_timedelta() != slip.date_to:
warnings.append(_("The duration of the payslip is not accurate according to the structure type."))
if warnings:
warnings = [_("This payslip can be erroneous :")] + warnings
slip.warning_message = "\n".join(warnings)
@api.depends('date_from', 'date_to', 'struct_id')
def _compute_is_wrong_duration(self):
for slip in self:
slip.is_wrong_duration = slip.date_to and (
slip.contract_id.schedule_pay
or slip.contract_id.structure_type_id.default_schedule_pay
) and (
slip.date_from + slip._get_schedule_timedelta() != slip.date_to
)
@api.depends('employee_id', 'contract_id', 'struct_id', 'date_from', 'date_to')
def _compute_worked_days_line_ids(self):
if not self or self.env.context.get('salary_simulation'):
return
valid_slips = self.filtered(lambda p: p.employee_id and p.date_from and p.date_to and p.contract_id and p.struct_id)
if not valid_slips:
return
# Make sure to reset invalid payslip's worked days line
self.update({'worked_days_line_ids': [(5, 0, 0)]})
# Ensure work entries are generated for all contracts
generate_from = min(p.date_from for p in valid_slips) + relativedelta(days=-1)
generate_to = max(p.date_to for p in valid_slips) + relativedelta(days=1)
self.contract_id.generate_work_entries(generate_from, generate_to)
work_entries = self.env['hr.work.entry'].search([
('date_stop', '<=', generate_to),
('date_start', '>=', generate_from),
('contract_id', 'in', self.contract_id.ids),
])
work_entries_by_contract = defaultdict(lambda: self.env['hr.work.entry'])
for work_entry in work_entries:
work_entries_by_contract[work_entry.contract_id.id] += work_entry
for slip in valid_slips:
if not slip.struct_id.use_worked_day_lines:
continue
# convert slip.date_to to a datetime with max time to compare correctly in filtered_domain.
slip_tz = pytz.timezone(slip.contract_id.resource_calendar_id.tz)
utc = pytz.timezone('UTC')
date_from = slip_tz.localize(datetime.combine(slip.date_from, time.min)).astimezone(utc).replace(tzinfo=None)
date_to = slip_tz.localize(datetime.combine(slip.date_to, time.max)).astimezone(utc).replace(tzinfo=None)
payslip_work_entries = work_entries_by_contract[slip.contract_id].filtered_domain([
('date_stop', '<=', date_to),
('date_start', '>=', date_from),
])
payslip_work_entries._check_undefined_slots(slip.date_from, slip.date_to)
# YTI Note: We can't use a batched create here as the payslip may not exist
slip.update({'worked_days_line_ids': slip._get_new_worked_days_lines()})
def _get_credit_time_lines(self):
lines_vals = self._get_worked_day_lines(domain=[('is_credit_time', '=', True)], check_out_of_contract=False)
for line_vals in lines_vals:
line_vals['is_credit_time'] = True
return lines_vals
def _get_new_worked_days_lines(self):
if self.struct_id.use_worked_day_lines:
if not self.contract_id.time_credit:
return [(0, 0, vals) for vals in self._get_worked_day_lines()]
worked_days_line_values = self._get_worked_day_lines(domain=[('is_credit_time', '=', False)])
for vals in worked_days_line_values:
vals['is_credit_time'] = False
credit_time_line_values = self._get_credit_time_lines()
return [(0, 0, vals) for vals in worked_days_line_values + credit_time_line_values]
return []
def _get_salary_line_total(self, code):
_logger.warning('The method _get_salary_line_total is deprecated in favor of _get_line_values')
lines = self.line_ids.filtered(lambda line: line.code == code)
return sum([line.total for line in lines])
def _get_salary_line_quantity(self, code):
_logger.warning('The method _get_salary_line_quantity is deprecated in favor of _get_line_values')
lines = self.line_ids.filtered(lambda line: line.code == code)
return sum([line.quantity for line in lines])
def _get_line_values(self, code_list, vals_list=None, compute_sum=False):
if vals_list is None:
vals_list = ['total']
valid_values = {'quantity', 'amount', 'total', 'ytd'}
if set(vals_list) - valid_values:
raise UserError(_('The following values are not valid:\n%s', '\n'.join(list(set(vals_list) - valid_values))))
result = defaultdict(lambda: defaultdict(lambda: dict.fromkeys(vals_list, 0.0)))
if not self or not code_list:
return result
self.env.flush_all()
selected_fields = ','.join('SUM(%s) AS %s' % (vals, vals) for vals in vals_list)
self.env.cr.execute("""
SELECT
p.id,
pl.code,
%s
FROM hr_payslip_line pl
JOIN hr_payslip p
ON p.id IN %s
AND pl.slip_id = p.id
AND pl.code IN %s
GROUP BY p.id, pl.code
""" % (selected_fields, '%s', '%s'), (tuple(self.ids), tuple(code_list)))
# self = hr.payslip(1, 2)
# request_rows = [
# {'id': 1, 'code': 'IP', 'total': 100, 'quantity': 1},
# {'id': 1, 'code': 'IP.DED', 'total': 200, 'quantity': 1},
# {'id': 2, 'code': 'IP', 'total': -2, 'quantity': 1},
# {'id': 2, 'code': 'IP.DED', 'total': -3, 'quantity': 1}
# ]
request_rows = self.env.cr.dictfetchall()
# result = {
# 'IP': {
# 'sum': {'quantity': 2, 'total': 300},
# 1: {'quantity': 1, 'total': 100},
# 2: {'quantity': 1, 'total': 200},
# },
# 'IP.DED': {
# 'sum': {'quantity': 2, 'total': -5},
# 1: {'quantity': 1, 'total': -2},
# 2: {'quantity': 1, 'total': -3},
# },
# }
for row in request_rows:
code = row['code']
payslip_id = row['id']
for vals in vals_list:
if compute_sum:
result[code]['sum'][vals] += row[vals] or 0.0
result[code][payslip_id][vals] += row[vals] or 0.0
return result
def _get_worked_days_line_values(self, code_list, vals_list=None, compute_sum=False):
if vals_list is None:
vals_list = ['amount']
valid_values = {'number_of_hours', 'number_of_days', 'amount', 'ytd'}
if set(vals_list) - valid_values:
raise UserError(_('The following values are not valid:\n%s', '\n'.join(list(set(vals_list) - valid_values))))
result = defaultdict(lambda: defaultdict(lambda: dict.fromkeys(vals_list, 0.0)))
if not self or not code_list:
return result
self.env.flush_all()
selected_fields = ','.join('SUM(%s) AS %s' % (vals, vals) for vals in vals_list)
self.env.cr.execute("""
SELECT
p.id,
wet.code,
%s
FROM hr_payslip_worked_days wd
JOIN hr_work_entry_type wet ON wet.id = wd.work_entry_type_id
JOIN hr_payslip p ON p.id IN %s
AND wd.payslip_id = p.id
AND wet.code IN %s
GROUP BY p.id, wet.code
""" % (selected_fields, '%s', '%s'), (tuple(self.ids), tuple(code_list)))
# self = hr.payslip(1, 2)
# request_rows = [
# {'id': 1, 'code': 'WORK100', 'amount': 100, 'number_of_days': 1},
# {'id': 1, 'code': 'LEAVE100', 'amount': 200, 'number_of_days': 1},
# {'id': 2, 'code': 'WORK100', 'amount': -2, 'number_of_days': 1},
# {'id': 2, 'code': 'LEAVE100', 'amount': -3, 'number_of_days': 1}
# ]
request_rows = self.env.cr.dictfetchall()
# result = {
# 'IP': {
# 'sum': {'number_of_days': 2, 'amount': 300},
# 1: {'number_of_days': 1, 'amount': 100},
# 2: {'number_of_days': 1, 'amount': 200},
# },
# 'LEAVE100': {
# 'sum': {'number_of_days': 2, 'amount': -5},
# 1: {'number_of_days': 1, 'amount': -2},
# 2: {'number_of_days': 1, 'amount': -3},
# },
# }
for row in request_rows:
code = row['code']
payslip_id = row['id']
for vals in vals_list:
if compute_sum:
result[code]['sum'][vals] += row[vals] or 0.0
result[code][payslip_id][vals] += row[vals] or 0.0
return result
# YTI TODO: Convert in a single SQL request + Handle children
def _get_category_data(self, category_code):
category_data = {'quantity': 0.0, 'total': 0.0}
for line in self.line_ids:
if line.category_id.code == category_code:
category_data['quantity'] += line.quantity
category_data['total'] += line.total
return category_data
def _get_worked_days_line_amount(self, code):
wds = self.worked_days_line_ids.filtered(lambda wd: wd.code == code)
return sum([wd.amount for wd in wds])
def _get_paid_worked_days_line_amount(self):
wds = self.worked_days_line_ids.filtered(lambda wd: wd.work_entry_type_id.id not in self.struct_id.unpaid_work_entry_type_ids.ids)
return sum(wd.amount for wd in wds)
def _get_worked_days_line_number_of_hours(self, code):
wds = self.worked_days_line_ids.filtered(lambda wd: wd.code == code)
return sum([wd.number_of_hours for wd in wds])
def _get_paid_worked_days_line_number_of_hours(self):
wds = self.worked_days_line_ids.filtered(lambda wd: wd.work_entry_type_id.id not in self.struct_id.unpaid_work_entry_type_ids.ids)
return sum(wd.number_of_hours for wd in wds)
def _get_worked_days_line_number_of_days(self, code):
wds = self.worked_days_line_ids.filtered(lambda wd: wd.code == code)
return sum([wd.number_of_days for wd in wds])
def _get_paid_worked_days_line_number_of_days(self):
wds = self.worked_days_line_ids.filtered(lambda wd: wd.work_entry_type_id.id not in self.struct_id.unpaid_work_entry_type_ids.ids)
return sum(wd.number_of_days for wd in wds)
def _get_input_line_amount(self, code):
lines = self.input_line_ids.filtered(lambda line: line.code == code)
return sum([line.amount for line in lines])
@api.model
def get_views(self, views, options=None):
res = super().get_views(views, options)
if options and options.get('toolbar'):
for view_type in res['views']:
res['views'][view_type]['toolbar'].pop('print', None)
return res
def action_print_payslip(self):
return {
'name': 'Payslip',
'type': 'ir.actions.act_url',
'url': '/print/payslips?list_ids=%(list_ids)s' % {'list_ids': ','.join(str(x) for x in self.ids)},
}
def action_export_payslip(self):
self.ensure_one()
return {
"name": "Debug Payslip",
"type": "ir.actions.act_url",
"url": "/debug/payslip/%s" % self.id,
}
def _get_contract_wage(self):
self.ensure_one()
return self.contract_id._get_contract_wage()
def _get_paid_amount(self):
self.ensure_one()
if self.env.context.get('no_paid_amount'):
return 0.0
if self.env.context.get('salary_simulation') or not self.struct_id.use_worked_day_lines:
return self._get_contract_wage()
total_amount = 0
for line in self.worked_days_line_ids:
total_amount += line.amount
return total_amount
def _get_unpaid_amount(self):
self.ensure_one()
return self._get_contract_wage() - self._get_paid_amount()
def _is_outside_contract_dates(self):
self.ensure_one()
payslip = self
contract = self.contract_id
return contract.date_start > payslip.date_to or (contract.date_end and contract.date_end < payslip.date_from)
def _get_data_files_to_update(self):
# Note: Use lists as modules/files order should be maintained
return []
def _update_payroll_data(self):
data_to_update = self._get_data_files_to_update()
_logger.info("Update payroll static data")
idref = {}
for module_name, files_to_update in data_to_update:
for file_to_update in files_to_update:
convert_file(self.env, module_name, file_to_update, idref)
def action_edit_payslip_lines(self):
self.ensure_one()
if not self.env.user.has_group('hr_payroll.group_hr_payroll_manager'):
raise UserError(_('This action is restricted to payroll managers only.'))
if self.state == 'done':
raise UserError(_('This action is forbidden on validated payslips.'))
wizard = self.env['hr.payroll.edit.payslip.lines.wizard'].create({
'payslip_id': self.id,
'line_ids': [(0, 0, {
'sequence': line.sequence,
'code': line.code,
'name': line.name,
'salary_rule_id': line.salary_rule_id.id,
'contract_id': line.contract_id.id,
'employee_id': line.employee_id.id,
'amount': line.amount,
'quantity': line.quantity,
'rate': line.rate,
'ytd': line.ytd,
'slip_id': self.id}) for line in self.line_ids],
'worked_days_line_ids': [(0, 0, {
'name': line.name,
'sequence': line.sequence,
'code': line.code,
'work_entry_type_id': line.work_entry_type_id.id,
'number_of_days': line.number_of_days,
'number_of_hours': line.number_of_hours,
'amount': line.amount,
'ytd': line.ytd,
'slip_id': self.id}) for line in self.worked_days_line_ids]
})
return {
'type': 'ir.actions.act_window',
'name': _('Edit Payslip Lines'),
'res_model': 'hr.payroll.edit.payslip.lines.wizard',
'view_mode': 'form',
'target': 'new',
'binding_model_id': self.env['ir.model.data']._xmlid_to_res_id('hr_payroll.model_hr_payslip'),
'binding_view_types': 'form',
'res_id': wizard.id
}
@api.model
def _cron_generate_pdf(self, batch_size=False):
payslips = self.search([
('state', 'in', ['done', 'paid']),
('queued_for_pdf', '=', True),
])
if payslips:
BATCH_SIZE = batch_size or 50
payslips_batch = payslips[:BATCH_SIZE]
payslips_batch._generate_pdf()
payslips_batch.write({'queued_for_pdf': False})
# if necessary, retrigger the cron to generate more pdfs
if len(payslips) > BATCH_SIZE:
self.env.ref('hr_payroll.ir_cron_generate_payslip_pdfs')._trigger()
return True
lines = self.env['hr.payroll.employee.declaration'].search([('pdf_to_generate', '=', True)])
if lines:
BATCH_SIZE = batch_size or 30
lines_batch = lines[:BATCH_SIZE]
lines_batch._generate_pdf()
lines_batch.write({'pdf_to_generate': False})
# if necessary, retrigger the cron to generate more pdfs
if len(lines) > BATCH_SIZE:
self.env.ref('hr_payroll.ir_cron_generate_payslip_pdfs')._trigger()
return True
return False
# Payroll Dashboard
@api.model
def _dashboard_default_action(self, name, res_model, res_ids, additional_context=None):
if additional_context is None:
additional_context = {}
return {
'type': 'ir.actions.act_window',
'name': name,
'res_model': res_model,
'context': {**self.env.context, **additional_context},
'domain': [('id', 'in', res_ids)],
'views': [[False, 'list'], [False, 'kanban'], [False, 'form']],
'view_mode': 'list,kanban,form',
}
@api.model
def get_dashboard_warnings(self):
# Retrieve the different warnings to display on the actions section (box on the left)
result = []
# Retrieves last batches (this month, or last month)
batch_limit_date = fields.Date.today() - relativedelta(months=1, day=1)
batch_group_read = self.env['hr.payslip.run'].with_context(lang='en_US')._read_group(
[('date_start', '>=', fields.Date.today() - relativedelta(months=1, day=1))],
groupby=['date_start:month'],
order='date_start:month desc')
# Keep only the last month
batch_group_read = batch_group_read[:1]
if batch_group_read:
min_date = batch_group_read[-1][0] or batch_limit_date
last_batches = self.env['hr.payslip.run'].search([('date_start', '>=', min_date)])
else:
last_batches = self.env['hr.payslip.run']
for warning in self.env['hr.payroll.dashboard.warning'].search([]):
localdict = {
'date': safe_eval_datetime.date,
'datetime': safe_eval_datetime.datetime,
'relativedelta': safe_eval_dateutil.relativedelta.relativedelta,
'warning_count': 0,
'warning_records': self.env['base'],
'warning_action': False,
'additional_context': {},
}
globaldict = {
'self': self.env['hr.payslip'],
'last_batches': last_batches,
'defaultdict': defaultdict,
}
try:
safe_eval(warning.evaluation_code, locals_dict=localdict, globals_dict=globaldict, mode='exec', nocopy=True)
except Exception as e:
raise UserError(_("Wrong warning computation code defined for:\n- Warning: %(warning)s\n- Error: %(error)s", warning=warning.name, error=e))
if localdict['warning_count']:
result.append({
'string': warning.name,
'color': warning.color,
'count': localdict['warning_count'],
'action': localdict['warning_action'] or self._dashboard_default_action(
warning.name,
localdict['warning_records']._name,
localdict['warning_records'].ids,
additional_context=localdict['additional_context'],
),
})
return result
def _get_employee_stats_actions(self):
result = []
today = fields.Date.today()
HRContract = self.env['hr.contract']
new_contracts = HRContract.search([
('state', '=', 'open'),
('kanban_state', '=', 'normal'),
('date_start', '>=', today + relativedelta(months=-3, day=1))])
past_contracts_grouped_by_employee_id = {
employee.id
for [employee] in HRContract._read_group([
('employee_id', 'in', new_contracts.employee_id.ids),
('date_end', '<', today),
('state', 'in', ['open', 'close']),
('id', 'not in', new_contracts.ids)
], groupby=['employee_id'])
}
new_contracts_without_past_contract = HRContract
for new_contract in new_contracts:
if new_contract.employee_id.id not in past_contracts_grouped_by_employee_id:
new_contracts_without_past_contract |= new_contract
if new_contracts_without_past_contract:
new_contracts_str = _('New Employees')
employees_from_new_contracts = new_contracts_without_past_contract.mapped('employee_id')
new_employees = {
'string': new_contracts_str,
'count': len(employees_from_new_contracts),
'action': self._dashboard_default_action(new_contracts_str, 'hr.employee', employees_from_new_contracts.ids),
}
new_employees['action']['views'][0] = [self.env.ref('hr_payroll.payroll_hr_employee_view_tree_employee_trends').id, 'list']
result.append(new_employees)
gone_employees = self.env['hr.employee'].with_context(active_test=False).search([
('departure_date', '>=', today + relativedelta(months=-1, day=1)),
('company_id', 'in', self.env.companies.ids),
])
if gone_employees:
gone_employees_str = _('Last Departures')
result.append({
'string': gone_employees_str,
'count': len(gone_employees),
'action': self.with_context(active_test=False)._dashboard_default_action(
gone_employees_str, 'hr.employee', gone_employees.ids),
})
return result
@api.model
def _get_dashboard_stat_employer_cost_codes(self):
costs = self.env['hr.salary.rule'].search_read([
('appears_on_employee_cost_dashboard', '=', True)],
fields=['code', 'name'])
cost_codes = {}
for cost in costs:
cost_codes[cost['code']] = cost['name']
return cost_codes
@api.model
def _get_dashboard_stats_employer_cost(self):
today = fields.Date.context_today(self)
date_formats = {
'monthly': 'MMMM y',
'yearly': 'y',
}
employer_cost = {
'type': 'stacked_bar',
'title': _('Employer Cost'),
'label': _('Employer Cost'),
'id': 'employer_cost',
'is_sample': False,
'actions': [],
'data': {
'monthly': defaultdict(lambda: [{}, {}, {}]),
'yearly': defaultdict(lambda: [{}, {}, {}]),
},
}
# Retrieve employer costs over the last 3 months
last_payslips = self.env['hr.payslip'].search([
('state', '!=', 'cancel'),
('date_from', '>=', fields.Date.today() + relativedelta(months=-2, day=1)),
('date_to', '<=', fields.Date.today() + relativedelta(day=31))
])
if not last_payslips:
employer_cost['is_sample'] = True
cost_codes = self._get_dashboard_stat_employer_cost_codes()
line_values = last_payslips._get_line_values(cost_codes.keys())
for slip in last_payslips:
for code, code_desc in cost_codes.items():
start = slip.date_from
end = today
idx = -((end.year - start.year) * 12 + (end.month - start.month) - 2)
amount = employer_cost['data']['monthly'][code_desc][idx].get('value', 0.0)
amount += line_values[code][slip.id]['total']
employer_cost['data']['monthly'][code_desc][idx]['value'] = amount
if not employer_cost['data']['monthly'][code_desc][idx].get('label'):
period_str = format_date(self.env, start, date_format=date_formats['monthly'])
employer_cost['data']['monthly'][code_desc][idx]['label'] = period_str
# Retrieve employer costs over the last 3 years
last_payslips = self.env['hr.payslip'].search([
('state', '!=', 'cancel'),
('date_from', '>=', fields.Date.today() + relativedelta(years=-2, day=1)),
('date_to', '<=', fields.Date.today() + relativedelta(month=12, day=31))
])
line_values = last_payslips._get_line_values(cost_codes.keys())
for slip in last_payslips:
for code, code_desc in cost_codes.items():
start = slip.date_from
end = today
idx = -(end.year - start.year - 2)
amount = employer_cost['data']['yearly'][code_desc][idx].get('value', 0.0)
amount += line_values[code][slip.id]['total']
employer_cost['data']['yearly'][code_desc][idx]['value'] = amount
if not employer_cost['data']['yearly'][code_desc][idx].get('label'):
period_str = format_date(self.env, start, date_format=date_formats['yearly'])
employer_cost['data']['yearly'][code_desc][idx]['label'] = period_str
# Nullify empty sections
for i in range(3):
for code, code_desc in cost_codes.items():
if not employer_cost['data']['monthly'][code_desc][i]:
value = 0 if not employer_cost['is_sample'] else random.randint(1000, 1500)
employer_cost['data']['monthly'][code_desc][i]['value'] = value
if not employer_cost['data']['monthly'][code_desc][i].get('label'):
label = format_date(self.env, today + relativedelta(months=i-2), date_format=date_formats['monthly'])
employer_cost['data']['monthly'][code_desc][i]['label'] = label
if not employer_cost['data']['yearly'][code_desc][i]:
value = 0 if not employer_cost['is_sample'] else random.randint(10000, 15000)
employer_cost['data']['yearly'][code_desc][i]['value'] = value
if not employer_cost['data']['yearly'][code_desc][i].get('label'):
label = format_date(self.env, today + relativedelta(years=i-2), date_format=date_formats['yearly'])
employer_cost['data']['yearly'][code_desc][i]['label'] = label
# Format/Round at the end as the method cost is heavy
for dummy, data_by_code in employer_cost['data'].items():
for code, data_by_type in data_by_code.items():
for data_dict in data_by_type:
value = round(data_dict['value'], 2)
data_dict['value'] = value
data_dict['formatted_value'] = format_amount(self.env, value, self.env.company.currency_id)
return employer_cost
@api.model
def _get_dashboard_stat_employee_trends(self):
today = fields.Date.context_today(self)
employees_trends = {
'type': 'bar',
'title': _('Employee Trends'),
'label': _('Employee Count'),
'id': 'employees',
'is_sample': False,
'actions': self._get_employee_stats_actions(),
'data': {
'monthly': [{}, {}, {}],
'yearly': [{}, {}, {}],
},
}
# These are all the periods for which we need data
periods = [
# Last month
(today - relativedelta(months=1, day=1), today - relativedelta(day=1, days=1), 'monthly,past'),
# This month
(today - relativedelta(day=1), today + relativedelta(months=1, day=1, days=-1), 'monthly,present'),
# Next month
(today + relativedelta(months=1, day=1), today + relativedelta(months=2, day=1, days=-1), 'monthly,future'),
# Last year
(today - relativedelta(years=1, month=1, day=1), today - relativedelta(years=1, month=12, day=31), 'yearly,past'),
# This year
(today - relativedelta(month=1, day=1), today + relativedelta(month=12, day=31), 'yearly,present'),
# Next year
(today + relativedelta(years=1, month=1, day=1), today + relativedelta(years=1, month=12, day=31), 'yearly,future'),
]
periods_str = ', '.join(
"(DATE '%(date_from)s', DATE '%(date_to)s', '%(date_type)s')" % {
'date_from': p[0].strftime('%Y-%m-%d'),
'date_to': p[1].strftime('%Y-%m-%d'),
'date_type': p[2],
} for p in periods)
# Fetch our statistics
# Contracts are joined by our period using the usual state/date conditions
# and aggregates are used to collect data directly from our database
# avoiding unnecessary orm overhead
self.env.cr.execute("""
WITH periods AS (
SELECT *
FROM (VALUES %s
) x(start, _end, _type)
)
-- fetch all contracts matching periods from `periods`
SELECT p.start, p._end, p._type, ARRAY_AGG(c.id),
COUNT (DISTINCT c.employee_id) as employee_count
FROM periods p
JOIN hr_contract c
ON (c.date_end >= p.start OR c.date_end IS NULL)
AND c.date_start <= p._end
AND (c.state IN ('open', 'close')
OR (c.state = 'done' AND c.kanban_state='normal'))
AND c.employee_id IS NOT NULL
AND c.active = TRUE
AND c.company_id IN %%s
GROUP BY p.start, p._end, p._type
""" % (periods_str), (tuple(self.env.companies.ids),))
period_indexes = {
'past': 0,
'present': 1,
'future': 2,
}
date_formats = {
'monthly': 'MMMM y',
'yearly': 'y',
}
# Collect data in our result
for res in self.env.cr.dictfetchall():
period_type, _type = res['_type'].split(',') # Ex: yearly,past
start = res['start']
period_idx = period_indexes[_type]
period_str = format_date(self.env, start, date_format=date_formats[period_type])
# The data is formatted for the chart module
employees_trends['data'][period_type][period_idx] = {
'label': period_str,
'value': res['employee_count'],
'name': period_str,
}
# Generates a point as sample data
def make_sample_data(period_str, period_type, chart_type):
if chart_type == 'line':
return {'x': period_str, 'name': period_str, 'y': random.randint(1000, 1500)}
return {'value': random.randint(1000, 1500), 'label': period_str, 'type': period_type}
# Generates empty data in case a column is missing
def make_null_data(period_str, period_type, chart_type):
if chart_type == 'line':
return {'x': period_str, 'name': period_str, 'y': 0}
return {'value': 0, 'label': period_str, 'type': period_type}
make_data = make_null_data
period_types = ['monthly', 'yearly']
if all(not data for data in employees_trends['data']['monthly']):
employees_trends['is_sample'] = True
make_data = make_sample_data
# Go through all the data and create null or sample values where necessary
for start, dummy, p_types in periods:
_type, _time = p_types.split(',')
i = period_indexes[_time]
for period in period_types:
period_str = format_date(self.env, start, date_format=date_formats[period])
if not employees_trends['data'][_type][i]:
employees_trends['data'][_type][i] = make_data(
period_str, _type, employees_trends['type'])
return employees_trends
@api.model
def _get_dashboard_stats(self):
# Retrieve the different stats to display on the stats sections
# This function fills in employees and employer costs statistics
# Default data, replaced by sample data if empty after query
employees_trends = self._get_dashboard_stat_employee_trends()
employer_cost = self._get_dashboard_stats_employer_cost()
return [employer_cost, employees_trends]
@api.model
def _get_dashboard_default_sections(self):
return ['batches', 'stats']
@api.model
def _get_dashboard_batch_fields(self):
return ['id', 'date_start', 'name', 'state', 'payslip_count']
@api.model
def get_payroll_dashboard_data(self, sections=None):
# Entry point for getting the dashboard data
# `sections` defines which part of the data we want to include/exclude
if sections is None:
sections = self._get_dashboard_default_sections()
result = {}
if 'batches' in sections:
# Batches are loaded for the last 3 months with batches, for example if there are no batches for
# the summer and september is loaded, we want to get september, june, may.
# Limit to max - 1 year
batch_limit_date = fields.Date.today() - relativedelta(years=1, day=1)
batch_group_read = self.env['hr.payslip.run'].with_context(lang='en_US')._read_group(
[('date_start', '>=', batch_limit_date)],
groupby=['date_start:month'],
limit=20,
order='date_start:month desc')
# Keep only the last 3 months
batch_group_read = batch_group_read[:3]
if batch_group_read:
min_date = batch_group_read[-1][0] or fields.Date.today() - relativedelta(months=1, day=1)
batches_read_result = self.env['hr.payslip.run'].search_read(
[('date_start', '>=', min_date)],
fields=self._get_dashboard_batch_fields())
else:
batches_read_result = []
translated_states = dict(self.env['hr.payslip.run']._fields['state']._description_selection(self.env))
for batch_read in batches_read_result:
batch_read.update({
'name': f"{batch_read['name']} ({format_date(self.env, batch_read['date_start'], date_format='MM/y')})",
'payslip_count': _('(%s Payslips)', batch_read['payslip_count']),
'state': translated_states.get(batch_read['state'], _('Unknown State')),
})
result['batches'] = batches_read_result
if 'stats' in sections:
result['stats'] = self._get_dashboard_stats()
return result