odoo18/addons_extensions/hr_timeoff_extended/models/hr_timeoff.py

480 lines
24 KiB
Python

from asyncore import write
from calendar import month
from dateutil.utils import today
from odoo import models, fields, api, _
from datetime import datetime, date, time, timedelta
from dateutil.relativedelta import relativedelta
from odoo.exceptions import ValidationError, UserError
from odoo.addons.hr_holidays.models.hr_leave import HolidaysRequest
class hrLeaveAccrualLevel(models.Model):
_inherit = 'hr.leave.accrual.level'
level_frequency = fields.Selection([
('daily', 'Daily'),
('weekly', 'Weekly'),
('monthly', 'Monthly'),
('yearly', 'Yearly'),
], default='daily', required=True, string="Frequency")
emp_type = fields.Many2many('hr.contract.type', string="Employee Type", tracking=True)
experience_required = fields.Boolean(string="Required Employee Exp")
max_start_count = fields.Integer(
"Start after",
help="The accrual will not proceed if the employee experience is grater that the count and type ",
default="1")
max_start_type = fields.Selection(
[('day', 'Days'),
('month', 'Months'),
('year', 'Years')],
default='day', string=" ", required=True,
help="This accrual will not proceed if the employee experience is grater that the count and type")
@api.constrains('start_count', 'start_type', 'max_start_count', 'max_start_type')
def _check_start_count_and_max(self):
for record in self:
# Define a function to convert the start_count and max_start_count to a base unit (e.g., days)
def convert_to_days(count, type):
if type == 'day':
return count
elif type == 'month':
return count * 30 # Approximate value for months (30 days)
elif type == 'year':
return count * 365 # Approximate value for years (365 days)
start_days = convert_to_days(record.start_count, record.start_type)
max_days = convert_to_days(record.max_start_count, record.max_start_type)
if start_days >= max_days:
raise ValidationError("Start count and type must be smaller than the max start count and type.")
class hrLeaveAccrualPlan(models.Model):
_inherit = 'hr.leave.accrual.plan'
accrual_start_count = fields.Integer(
"Start after",
help="The accrual starts after a defined period from the employee joining date. This field defines the number of days, months or years after which accrual is used.",
default="1")
accrual_start_type = fields.Selection(
[('day', 'Days'),
('month', 'Months'),
('year', 'Years')],
default='day', string=" ", required=True,
help="This field defines the unit of time after which the employee joining date.")
time_off_type_id = fields.Many2one('hr.leave.type', domain=[('requires_allocation','=','yes')])
_sql_constraints = [
('unique_time_off_type_id', 'unique(time_off_type_id)', 'You can not create multiple plans with same leave type.')
]
class hrTimeoffAllocation(models.Model):
_inherit = "hr.leave.allocation"
allocation_type = fields.Selection(selection_add =[('auto_allocation','Auto Allocation')],ondelete={
'auto_allocation': 'cascade',
},)
def _process_accrual_plans(self, date_to=False, force_period=False, log=True):
pass
def activity_update(self):
for allocation in self:
if allocation.allocation_type != 'auto_allocation':
super().activity_update()
@api.model
def _update_accrual(self):
"""
Method called by the cron task in order to increment the number_of_days when
necessary, based on the frequency selected in the accrual level.
"""
accrual_plans = self.env['hr.leave.accrual.plan'].sudo().search([])
for accrual in accrual_plans:
employees = self.env['hr.employee'].sudo().search([('run_auto_allocations', '=', True)])
if accrual.accrual_start_count and accrual.accrual_start_type:
if accrual.accrual_start_count > 0:
employees = employees.filtered(lambda emp: self._is_accrual_applicable(emp, accrual))
level_ids = accrual.level_ids.sorted('sequence')
if not level_ids:
continue
for level in level_ids:
# Calculate the current frequency
if level.emp_type:
level_filtered_employees = employees.filtered(lambda emp: emp.emp_type.id in level.emp_type.ids)
else:
level_filtered_employees = employees
if level.experience_required:
qualified_employees = level_filtered_employees.filtered(lambda emp: self._emp_filter_by_level(emp, level))
else:
qualified_employees = level_filtered_employees
# After filtering, we create the leave allocation for each employee
for emp in qualified_employees:
run_allocation = self._handel_weekly_frequency(level,emp)
if run_allocation:
allocations = self.env['hr.leave.allocation'].sudo().search([('employee_id','=',emp.id),('holiday_status_id','=',accrual.time_off_type_id.id),('state','=','validate')])
leaves = self.env['hr.leave'].sudo().search([('employee_id','=',emp.id),('holiday_status_id','=',accrual.time_off_type_id.id),('state','not in',['draft','refuse','cancel'])])
emp_leave_balance = sum(allocation.number_of_days for allocation in allocations) - sum(leave.number_of_days for leave in leaves)
if level.cap_accrued_time and level.maximum_leave < emp_leave_balance:
continue
self._create_leave_allocation(emp, level, accrual)
def _handel_weekly_frequency(self,level,emp):
today_date = datetime.today().date()
if level.level_frequency == 'weekly':
weekday_map = {
'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3, 'fri': 4, 'sat': 5, 'sun': 6
}
return True if today_date.weekday() == weekday_map.get(level.week_day) else False
elif level.level_frequency == 'daily':
return True
elif level.level_frequency == 'monthly':
return True if level.first_day_display == str(today_date.day) or (emp.doj and ((emp.doj + timedelta(days=2)) == today_date)) else False
elif level.level_frequency == 'yearly':
month_map = {
'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6,
'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12
}
return True if (level.first_day_display == str(today_date.day) and today_date.month == month_map.get(level.yearly_month)) or (emp.doj and ((emp.doj + timedelta(days=2)) == today_date)) else False
else:
return True
def _create_leave_allocation(self, employee, level, accrual):
"""
Create leave allocation for a qualified employee based on the accrual level and added value.
"""
today_date = datetime.today().date()
number_of_days = level.added_value
if employee.doj and ((employee.doj + timedelta(days=2)) == today_date):
if level.level_frequency == 'monthly':
if employee.doj.day <= 10:
number_of_days = level.added_value
else:
number_of_days = level.added_value/2
elif level.level_frequency == 'yearly':
start_month = int(level.yearly_month)
joining_month = employee.doj.month
# Compute remaining months in the allocation cycle
remaining_months = (start_month - joining_month) % 12 or 12
# Calculate proportional leaves
number_of_days = (level.added_value / 12) * remaining_months
if employee.doj.day > 10:
number_of_days = number_of_days - ((level.added_value / 12)/2)
self.env['hr.leave.allocation'].sudo().create({
'employee_id': employee.id,
'holiday_status_id': accrual.time_off_type_id.id,
'date_from': fields.Date.today(),
'number_of_days': number_of_days,
'allocation_type': 'auto_allocation'
}).action_approve()
def _is_accrual_applicable(self, employee, accrual):
"""
Helper method to check if the accrual is applicable to the employee
based on joining date and the accrual plan.
"""
if employee.doj:
doj = employee.doj
start_date = False
# Calculate the start date based on the accrual type and count
if accrual.accrual_start_type == 'day':
start_date = doj + relativedelta(days=accrual.accrual_start_count)
elif accrual.accrual_start_type == 'month':
start_date = doj + relativedelta(months=accrual.accrual_start_count)
elif accrual.accrual_start_type == 'year':
start_date = doj + relativedelta(years=accrual.accrual_start_count)
# Return if the employee's today date is greater than or equal to start_date
return fields.date.today() >= start_date
return False
def _emp_filter_by_level(self, employee, level):
"""
Helper method to check if the employee's total experience (including previous experience) is within the
range defined by the accrual's start and max start counts and types.
"""
if employee.doj:
# Calculate the employee's total experience based on DOJ and previous_exp
total_experience = self._calculate_total_experience(employee.doj, employee.previous_exp)
# Calculate the minimum experience based on accrual_start_count and accrual_start_type
min_experience = self._calculate_experience_threshold(level.start_count,
level.start_type)
# Calculate the maximum experience based on max_start_count and max_start_type
max_experience = self._calculate_experience_threshold(level.max_start_count, level.max_start_type)
# Return whether the total experience is within the min and max range
return min_experience <= total_experience <= max_experience
return False
def _calculate_total_experience(self, doj, previous_exp):
"""
Helper method to calculate total experience of an employee in months, combining the previous experience
and experience since the DOJ.
"""
today = datetime.today()
# Calculate current experience from the date of joining
delta = relativedelta(today, doj)
current_experience_months = delta.years * 12 + delta.months
# Add previous experience (already in months)
total_experience_months = current_experience_months + previous_exp
return total_experience_months
def _calculate_experience_threshold(self, count, experience_type):
"""
Helper method to calculate the experience threshold in months based on a given count and type.
"""
today = datetime.today()
start_date = today
if experience_type == 'day':
start_date = today - relativedelta(days=count)
elif experience_type == 'month':
start_date = today - relativedelta(months=count)
elif experience_type == 'year':
start_date = today - relativedelta(years=count)
# Calculate the experience in months for the threshold
return self._calculate_total_experience(start_date, 0)
class HRLeave(models.Model):
_inherit = 'hr.leave'
state = fields.Selection([
('draft', 'Draft'),
('confirm', 'To Approve'),
('refuse', 'Refused'),
('validate1', 'Second Approval'),
('validate', 'Approved'),
('cancel', 'Cancelled'),
], string='Status', store=True, tracking=True, copy=False, readonly=False, default='draft',
help="The status is set to 'Draft' when the leave request is created." +
"\nThe status is 'To Approve', when time off request is confirmed by user." +
"\nThe status is 'Refused', when time off request is refused by manager." +
"\nThe status is 'Second Approval', when time off request is awaiting further validation." +
"\nThe status is 'Approved', when time off request is approved by manager." +
"\nThe status is 'Cancelled', when time off request is cancelled.")
submitted_date = fields.Datetime(string="Submit Date")
def write(self, values):
is_officer = self.env.user.has_group('hr_holidays.group_hr_holidays_user') or self.env.is_superuser()
if not is_officer and values.keys() - {'attachment_ids', 'supported_attachment_ids', 'message_main_attachment_id'}:
# if any(hol.date_from.date() < fields.Date.today() and hol.employee_id.leave_manager_id != self.env.user for hol in self):
# raise UserError(_('You must have manager rights to modify/validate a time off that already begun'))
if any(leave.state == 'cancel' for leave in self):
# raise UserError(_('Only a manager can modify a canceled leave.'))
pass
# Unlink existing resource.calendar.leaves for validated time off
if 'state' in values and values['state'] != 'validate':
validated_leaves = self.filtered(lambda l: l.state == 'validate')
validated_leaves._remove_resource_leave()
employee_id = values.get('employee_id', False)
if not self.env.context.get('leave_fast_create'):
if values.get('state'):
self._check_approval_update(values['state'])
if any(holiday.validation_type == 'both' for holiday in self):
if values.get('employee_id'):
employees = self.env['hr.employee'].browse(values.get('employee_id'))
else:
employees = self.mapped('employee_id')
self._check_double_validation_rules(employees, values['state'])
if 'date_from' in values:
values['request_date_from'] = values['date_from']
if 'date_to' in values:
values['request_date_to'] = values['date_to']
result = super(HolidaysRequest, self).write(values)
if any(field in values for field in ['request_date_from', 'date_from', 'request_date_from', 'date_to', 'holiday_status_id', 'employee_id', 'state']):
self._check_validity()
if not self.env.context.get('leave_fast_create'):
for holiday in self:
if employee_id:
holiday.add_follower(employee_id)
return result
def _check_validity(self):
for rec in self:
if rec.holiday_status_id.limit_leave_requests:
if rec.holiday_status_id.limit_request_type and rec.holiday_status_id.limit_emp_type and rec.holiday_status_id.limit_request_count >= 0:
if rec.employee_id.sudo().emp_type and rec.employee_id.sudo().emp_type.id in rec.holiday_status_id.limit_emp_type.ids:
time_frame = {
'week': timedelta(weeks=1),
'month': timedelta(days=30),
'year': timedelta(days=365),
}.get(rec.holiday_status_id.limit_request_type, timedelta(weeks=1)) # Default to 1 week
restriction_start_date = datetime.now() - time_frame
# Count the leave requests made by the employee within the restriction period
leave_count = self.env['hr.leave'].sudo().search_count([
('employee_id', '=', rec.employee_id.id),
('state', 'not in', ['cancel', 'refuse', 'draft']), # Adjust states if needed
('holiday_status_id', '=', rec.holiday_status_id.id),
('request_date_from', '>=', restriction_start_date),
('id','!=',rec.id)
])
if leave_count >= rec.holiday_status_id.limit_request_count:
if rec.holiday_status_id.limit_request_count == 0:
raise ValidationError(_("This time off type is not applicable for %s employee. Please contact your admin if required"%(rec.employee_id.sudo().emp_type.name)))
else:
raise ValidationError(_(
"You have exceeded the maximum allowed leave requests (%s) for the selected period (%s)."
) % (rec.holiday_status_id.limit_request_count, rec.holiday_status_id.limit_request_type))
return super(HRLeave, self)._check_validity()
def action_draft(self):
for rec in self:
is_officer = self.env.user.has_group('hr_holidays.group_hr_holidays_user') or self.env.is_superuser()
if rec.sudo().employee_id.user_id.id != self.env.user.id and not is_officer:
raise ValidationError(_("Only employee can submit his own leave"))
rec.submitted_date = fields.Datetime.now()
self._check_validity()
rec.state = 'confirm'
action = self.sudo().env.ref('hr_holidays.hr_leave_action_my')
return {
'type': 'ir.actions.act_window',
'name': action.name,
'res_model': action.res_model,
'view_mode': action.view_mode,
'target': 'current',
}
def action_reset_confirm(self):
if any(holiday.state not in ['cancel', 'refuse'] for holiday in self):
raise UserError(_('Time off request state must be "Refused" or "Cancelled" in order to be reset to "Confirmed".'))
self.write({
'state': 'draft',
'first_approver_id': False,
'second_approver_id': False,
})
self.activity_update()
return True
def action_approve(self):
for rec in self:
if rec.employee_id.leave_manager_id.id != self.env.user.id :
raise ValidationError(_("Only Employees Time Off Manager can approve this Leave request"))
return super(HRLeave, self).action_approve()
def action_refuse(self):
for rec in self:
if (rec.employee_id.leave_manager_id.id != self.env.user.id) and (self.env.user.id not in rec.holiday_status_id.responsible_ids.ids):
raise ValidationError(_("only Employee / Leave type Time off Manager's can refuse this Leave request"))
return super(HRLeave, self).action_refuse()
def action_validate(self, check_state=True):
current_employee = self.env.user.employee_id
for holiday in self:
if check_state and holiday.state in ['validate1'] and holiday.validation_type == 'both' and (holiday.holiday_status_id.responsible_ids and (current_employee.user_id.id not in holiday.holiday_status_id.responsible_ids.ids)):
raise UserError(_('Only Timeoff officers for the %s can validate this leave'%(holiday.holiday_status_id.name)))
return super(HRLeave, self).action_validate(check_state)
@api.depends_context('uid')
@api.depends('state', 'employee_id')
def _compute_can_cancel(self):
for leave in self:
leave.can_cancel = leave.id and leave.employee_id.user_id == self.env.user and leave.state in ['confirm']
@api.ondelete(at_uninstall=False)
def _unlink_if_correct_states(self):
error_message = _('You cannot delete a time off which is in %s state')
state_description_values = {elem[0]: elem[1] for elem in self._fields['state']._description_selection(self.env)}
now = fields.Datetime.now().date()
if not self.env.user.has_group('hr_holidays.group_hr_holidays_user'):
for hol in self:
if hol.state not in ['draft', 'cancel']:
raise UserError(error_message % state_description_values.get(self[:1].state))
else:
for holiday in self.filtered(lambda holiday: holiday.state not in ['cancel', 'draft']):
raise UserError(error_message % (state_description_values.get(holiday.state),))
def _check_approval_update(self, state):
""" Check if target state is achievable. """
if self.env.is_superuser():
return
current_employee = self.env.user.employee_id
is_officer = self.env.user.has_group('hr_holidays.group_hr_holidays_user')
is_manager = self.env.user.has_group('hr_holidays.group_hr_holidays_manager')
for holiday in self:
val_type = holiday.validation_type
if not is_manager:
if holiday.state == 'cancel' and state != 'confirm':
raise UserError(_('A cancelled leave cannot be modified.'))
if state == 'confirm':
if holiday.state == 'refuse':
raise UserError(_('Only a Time Off Manager can reset a refused leave.'))
# if holiday.date_from and holiday.date_from.date() <= fields.Date.today():
# raise UserError(_('Only a Time Off Manager can reset a started leave.'))
if holiday.employee_id != current_employee:
raise UserError(_('Only a Time Off Manager can reset other people leaves.'))
else:
if val_type == 'no_validation' and current_employee == holiday.employee_id and (is_officer or is_manager):
continue
# use ir.rule based first access check: department, members, ... (see security.xml)
holiday.check_access('write')
# This handles states validate1 validate and refuse
if holiday.employee_id == current_employee\
and self.env.user != holiday.employee_id.leave_manager_id\
and not is_officer:
raise UserError(_('Only a Time Off Officer or Manager can approve/refuse its own requests.'))
if (state == 'validate1' and val_type == 'both'):
if not is_officer and self.env.user != holiday.employee_id.leave_manager_id:
raise UserError(_('You must be either %s\'s manager or Time off Manager to approve this leave') % (holiday.employee_id.name))
if (state == 'validate' and val_type == 'manager')\
and self.env.user != holiday.employee_id.leave_manager_id\
and not is_officer:
raise UserError(_("You must be %s's Manager to approve this leave", holiday.employee_id.name))
if not is_officer and (state == 'validate' and val_type == 'hr'):
raise UserError(_('You must either be a Time off Officer or Time off Manager to approve this leave'))
HolidaysRequest.write = HRLeave.write
class HRLeaveType(models.Model):
_inherit='hr.leave.type'
request_unit_type = fields.Selection([
('day', 'Day'),
('half_day', 'Half Day')], default='day', string='Take Time Off in', required=True)
request_unit = fields.Selection(related="request_unit_type",store=True)
limit_leave_requests = fields.Boolean(string='Limit Leave Requests', default=False)
limit_request_count = fields.Integer(
"limit Count",
help="Defines the minimum number of leave requests after which the restriction will apply. For example, set 1 to start restrictions after the first request.",
default="1")
limit_request_type = fields.Selection(
[('week', 'Week'),
('month', 'Month'),
('year', 'Year')],
default='month', string="Limit Type", required=True,
help="Specifies the type of time period (days, months, or years) for applying the leave request")
limit_emp_type = fields.Many2many('hr.contract.type', string="Employee Type")