odoo18/addons_extensions/hr_timeoff_extended/models/hr_timeoff.py

287 lines
13 KiB
Python

from calendar import month
from dateutil.utils import today
from odoo import models, fields, api, _
from datetime import datetime, date, time, timedelta
from dateutil.relativedelta import relativedelta
from odoo.exceptions import ValidationError, UserError
class hrLeaveAccrualLevel(models.Model):
_inherit = 'hr.leave.accrual.level'
level_frequency = fields.Selection([
('daily', 'Daily'),
('weekly', 'Weekly'),
('monthly', 'Monthly'),
('yearly', 'Yearly'),
], default='daily', required=True, string="Frequency")
emp_type = fields.Many2one('hr.contract.type', "Employee Type", tracking=True)
max_start_count = fields.Integer(
"Start after",
help="The accrual will not proceed if the employee experience is grater that the count and type ",
default="1")
max_start_type = fields.Selection(
[('day', 'Days'),
('month', 'Months'),
('year', 'Years')],
default='day', string=" ", required=True,
help="This accrual will not proceed if the employee experience is grater that the count and type")
@api.constrains('start_count', 'start_type', 'max_start_count', 'max_start_type')
def _check_start_count_and_max(self):
for record in self:
# Define a function to convert the start_count and max_start_count to a base unit (e.g., days)
def convert_to_days(count, type):
if type == 'day':
return count
elif type == 'month':
return count * 30 # Approximate value for months (30 days)
elif type == 'year':
return count * 365 # Approximate value for years (365 days)
start_days = convert_to_days(record.start_count, record.start_type)
max_days = convert_to_days(record.max_start_count, record.max_start_type)
if start_days >= max_days:
raise ValidationError("Start count and type must be smaller than the max start count and type.")
class hrLeaveAccrualPlan(models.Model):
_inherit = 'hr.leave.accrual.plan'
accrual_start_count = fields.Integer(
"Start after",
help="The accrual starts after a defined period from the employee joining date. This field defines the number of days, months or years after which accrual is used.",
default="1")
accrual_start_type = fields.Selection(
[('day', 'Days'),
('month', 'Months'),
('year', 'Years')],
default='day', string=" ", required=True,
help="This field defines the unit of time after which the employee joining date.")
time_off_type_id = fields.Many2one('hr.leave.type', domain=[('requires_allocation','=','yes')])
_sql_constraints = [
('unique_time_off_type_id', 'unique(time_off_type_id)', 'You can not create multiple plans with same leave type.')
]
class hrTimeoffAllocation(models.Model):
_inherit = "hr.leave.allocation"
allocation_type = fields.Selection(selection_add =[('auto_allocation','Auto Allocation')],ondelete={
'auto_allocation': 'cascade',
},)
def _process_accrual_plans(self, date_to=False, force_period=False, log=True):
pass
@api.model
def _update_accrual(self):
"""
Method called by the cron task in order to increment the number_of_days when
necessary, based on the frequency selected in the accrual level.
"""
accrual_plans = self.env['hr.leave.accrual.plan'].sudo().search([])
for accrual in accrual_plans:
employees = self.env['hr.employee'].sudo().search([('run_auto_allocations', '=', True)])
if accrual.accrual_start_count and accrual.accrual_start_type:
if accrual.accrual_start_count > 0:
employees = employees.filtered(lambda emp: self._is_accrual_applicable(emp, accrual))
level_ids = accrual.level_ids.sorted('sequence')
if not level_ids:
continue
for level in level_ids:
# Calculate the current frequency
run_allocation = self._handel_weekly_frequency(level)
if run_allocation:
if level.emp_type:
level_filtered_employees = employees.filtered(lambda emp: emp.emp_type == level.emp_type)
else:
level_filtered_employees = employees
qualified_employees = level_filtered_employees.filtered(lambda emp: self._emp_filter_by_level(emp, level))
# After filtering, we create the leave allocation for each employee
for emp in qualified_employees:
allocations = self.env['hr.leave.allocation'].sudo().search([('employee_id','=',emp.id),('holiday_status_id','=',accrual.time_off_type_id.id),('state','=','validate')])
leaves = self.env['hr.leave'].sudo().search([('employee_id','=',emp.id),('holiday_status_id','=',accrual.time_off_type_id.id),('state','not in',['draft','refuse','cancel'])])
emp_leave_balance = sum(allocation.number_of_days for allocation in allocations) - sum(leave.number_of_days for leave in leaves)
if level.cap_accrued_time and level.maximum_leave < emp_leave_balance:
continue
self._create_leave_allocation(emp, level, accrual)
def _handel_weekly_frequency(self,level):
today_date = datetime.today().date()
if level.level_frequency == 'weekly':
weekday_map = {
'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3, 'fri': 4, 'sat': 5, 'sun': 6
}
return True if today_date.weekday() == weekday_map.get(level.week_day) else False
elif level.level_frequency == 'daily':
return True
elif level.level_frequency == 'monthly':
return True if level.first_day_display == str(today_date.day) else False
elif level.level_frequency == 'yearly':
month_map = {
'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6,
'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12
}
return True if level.first_day_display == str(today_date.day) and today_date.month == month_map.get(level.yearly_month) else False
else:
return True
def _create_leave_allocation(self, employee, level, accrual):
"""
Create leave allocation for a qualified employee based on the accrual level and added value.
"""
self.env['hr.leave.allocation'].sudo().create({
'employee_id': employee.id,
'holiday_status_id': accrual.time_off_type_id.id,
'date_from': fields.Date.today(),
'number_of_days': level.added_value,
'allocation_type': 'auto_allocation'
}).action_approve()
def _is_accrual_applicable(self, employee, accrual):
"""
Helper method to check if the accrual is applicable to the employee
based on joining date and the accrual plan.
"""
if employee.doj:
doj = employee.doj
start_date = False
# Calculate the start date based on the accrual type and count
if accrual.accrual_start_type == 'day':
start_date = doj + relativedelta(days=accrual.accrual_start_count)
elif accrual.accrual_start_type == 'month':
start_date = doj + relativedelta(months=accrual.accrual_start_count)
elif accrual.accrual_start_type == 'year':
start_date = doj + relativedelta(years=accrual.accrual_start_count)
# Return if the employee's today date is greater than or equal to start_date
return fields.date.today() >= start_date
return False
def _emp_filter_by_level(self, employee, level):
"""
Helper method to check if the employee's total experience (including previous experience) is within the
range defined by the accrual's start and max start counts and types.
"""
if employee.doj:
# Calculate the employee's total experience based on DOJ and previous_exp
total_experience = self._calculate_total_experience(employee.doj, employee.previous_exp)
# Calculate the minimum experience based on accrual_start_count and accrual_start_type
min_experience = self._calculate_experience_threshold(level.start_count,
level.start_type)
# Calculate the maximum experience based on max_start_count and max_start_type
max_experience = self._calculate_experience_threshold(level.max_start_count, level.max_start_type)
# Return whether the total experience is within the min and max range
return min_experience <= total_experience <= max_experience
return False
def _calculate_total_experience(self, doj, previous_exp):
"""
Helper method to calculate total experience of an employee in months, combining the previous experience
and experience since the DOJ.
"""
today = datetime.today()
# Calculate current experience from the date of joining
delta = relativedelta(today, doj)
current_experience_months = delta.years * 12 + delta.months
# Add previous experience (already in months)
total_experience_months = current_experience_months + previous_exp
return total_experience_months
def _calculate_experience_threshold(self, count, experience_type):
"""
Helper method to calculate the experience threshold in months based on a given count and type.
"""
today = datetime.today()
start_date = today
if experience_type == 'day':
start_date = today - relativedelta(days=count)
elif experience_type == 'month':
start_date = today - relativedelta(months=count)
elif experience_type == 'year':
start_date = today - relativedelta(years=count)
# Calculate the experience in months for the threshold
return self._calculate_total_experience(start_date, 0)
class HRLeave(models.Model):
_inherit = 'hr.leave'
state = fields.Selection([
('draft', 'Draft'),
('confirm', 'To Approve'),
('refuse', 'Refused'),
('validate1', 'Second Approval'),
('validate', 'Approved'),
('cancel', 'Cancelled'),
], string='Status', store=True, tracking=True, copy=False, readonly=False, default='draft',
help="The status is set to 'Draft' when the leave request is created." +
"\nThe status is 'To Approve', when time off request is confirmed by user." +
"\nThe status is 'Refused', when time off request is refused by manager." +
"\nThe status is 'Second Approval', when time off request is awaiting further validation." +
"\nThe status is 'Approved', when time off request is approved by manager." +
"\nThe status is 'Cancelled', when time off request is cancelled.")
def action_draft(self):
for rec in self:
if rec.employee_id.user_id.id != self.env.user.id:
raise ValidationError(_("Only employee can submit his own leave"))
self._check_validity()
rec.state = 'confirm'
def action_reset_confirm(self):
if any(holiday.state not in ['cancel', 'refuse'] for holiday in self):
raise UserError(_('Time off request state must be "Refused" or "Cancelled" in order to be reset to "Confirmed".'))
self.write({
'state': 'draft',
'first_approver_id': False,
'second_approver_id': False,
})
self.activity_update()
return True
def action_approve(self):
for rec in self:
if rec.employee_id.leave_manager_id.id != self.env.user.id:
raise ValidationError(_("Only Employees Time Off Approver can approve this "))
@api.ondelete(at_uninstall=False)
def _unlink_if_correct_states(self):
error_message = _('You cannot delete a time off which is in %s state')
state_description_values = {elem[0]: elem[1] for elem in self._fields['state']._description_selection(self.env)}
now = fields.Datetime.now().date()
if not self.env.user.has_group('hr_holidays.group_hr_holidays_user'):
for hol in self:
if hol.state not in ['draft', 'cancel']:
raise UserError(error_message % state_description_values.get(self[:1].state))
if hol.date_from.date() < now:
raise UserError(_('You cannot delete a time off which is in the past'))
else:
for holiday in self.filtered(lambda holiday: holiday.state not in ['cancel', 'draft']):
raise UserError(error_message % (state_description_values.get(holiday.state),))
class HRLeaveType(models.Model):
_inherit='hr.leave.type'
request_unit_type = fields.Selection([
('day', 'Day'),
('half_day', 'Half Day')], default='day', string='Take Time Off in', required=True)
request_unit = fields.Selection(related="request_unit_type",store=True)