from calendar import month from dateutil.utils import today from odoo import models, fields, api, _ from datetime import datetime, date, time, timedelta from dateutil.relativedelta import relativedelta from odoo.exceptions import ValidationError, UserError from odoo.addons.hr_holidays.models.hr_leave import HolidaysRequest class hrLeaveAccrualLevel(models.Model): _inherit = 'hr.leave.accrual.level' level_frequency = fields.Selection([ ('daily', 'Daily'), ('weekly', 'Weekly'), ('monthly', 'Monthly'), ('yearly', 'Yearly'), ], default='daily', required=True, string="Frequency") emp_type = fields.Many2many('hr.contract.type', string="Employee Type", tracking=True) experience_required = fields.Boolean(string="Required Employee Exp") max_start_count = fields.Integer( "Start after", help="The accrual will not proceed if the employee experience is grater that the count and type ", default="1") max_start_type = fields.Selection( [('day', 'Days'), ('month', 'Months'), ('year', 'Years')], default='day', string=" ", required=True, help="This accrual will not proceed if the employee experience is grater that the count and type") @api.constrains('start_count', 'start_type', 'max_start_count', 'max_start_type') def _check_start_count_and_max(self): for record in self: # Define a function to convert the start_count and max_start_count to a base unit (e.g., days) def convert_to_days(count, type): if type == 'day': return count elif type == 'month': return count * 30 # Approximate value for months (30 days) elif type == 'year': return count * 365 # Approximate value for years (365 days) start_days = convert_to_days(record.start_count, record.start_type) max_days = convert_to_days(record.max_start_count, record.max_start_type) if start_days >= max_days: raise ValidationError("Start count and type must be smaller than the max start count and type.") class hrLeaveAccrualPlan(models.Model): _inherit = 'hr.leave.accrual.plan' accrual_start_count = fields.Integer( "Start after", help="The accrual starts after a defined period from the employee joining date. This field defines the number of days, months or years after which accrual is used.", default="1") accrual_start_type = fields.Selection( [('day', 'Days'), ('month', 'Months'), ('year', 'Years')], default='day', string=" ", required=True, help="This field defines the unit of time after which the employee joining date.") time_off_type_id = fields.Many2one('hr.leave.type', domain=[('requires_allocation','=','yes')]) _sql_constraints = [ ('unique_time_off_type_id', 'unique(time_off_type_id)', 'You can not create multiple plans with same leave type.') ] class hrTimeoffAllocation(models.Model): _inherit = "hr.leave.allocation" allocation_type = fields.Selection(selection_add =[('auto_allocation','Auto Allocation')],ondelete={ 'auto_allocation': 'cascade', },) def _process_accrual_plans(self, date_to=False, force_period=False, log=True): pass def activity_update(self): for allocation in self: if allocation.allocation_type != 'auto_allocation': super().activity_update() @api.model def _update_accrual(self): """ Method called by the cron task in order to increment the number_of_days when necessary, based on the frequency selected in the accrual level. """ accrual_plans = self.env['hr.leave.accrual.plan'].sudo().search([]) for accrual in accrual_plans: employees = self.env['hr.employee'].sudo().search([('run_auto_allocations', '=', True)]) if accrual.accrual_start_count and accrual.accrual_start_type: if accrual.accrual_start_count > 0: employees = employees.filtered(lambda emp: self._is_accrual_applicable(emp, accrual)) level_ids = accrual.level_ids.sorted('sequence') if not level_ids: continue for level in level_ids: # Calculate the current frequency if level.emp_type: level_filtered_employees = employees.filtered(lambda emp: emp.emp_type.id in level.emp_type.ids) else: level_filtered_employees = employees if level.experience_required: qualified_employees = level_filtered_employees.filtered(lambda emp: self._emp_filter_by_level(emp, level)) else: qualified_employees = level_filtered_employees # After filtering, we create the leave allocation for each employee for emp in qualified_employees: run_allocation = self._handel_weekly_frequency(level,emp) if run_allocation: allocations = self.env['hr.leave.allocation'].sudo().search([('employee_id','=',emp.id),('holiday_status_id','=',accrual.time_off_type_id.id),('state','=','validate')]) leaves = self.env['hr.leave'].sudo().search([('employee_id','=',emp.id),('holiday_status_id','=',accrual.time_off_type_id.id),('state','not in',['draft','refuse','cancel'])]) emp_leave_balance = sum(allocation.number_of_days for allocation in allocations) - sum(leave.number_of_days for leave in leaves) if level.cap_accrued_time and level.maximum_leave < emp_leave_balance: continue self._create_leave_allocation(emp, level, accrual) def _handel_weekly_frequency(self,level,emp): today_date = datetime.today().date() if level.level_frequency == 'weekly': weekday_map = { 'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3, 'fri': 4, 'sat': 5, 'sun': 6 } return True if today_date.weekday() == weekday_map.get(level.week_day) else False elif level.level_frequency == 'daily': return True elif level.level_frequency == 'monthly': return True if level.first_day_display == str(today_date.day) or (emp.doj and ((emp.doj + timedelta(days=2)) == today_date)) else False elif level.level_frequency == 'yearly': month_map = { 'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12 } return True if (level.first_day_display == str(today_date.day) and today_date.month == month_map.get(level.yearly_month)) or (emp.doj and ((emp.doj + timedelta(days=2)) == today_date)) else False else: return True def _create_leave_allocation(self, employee, level, accrual): """ Create leave allocation for a qualified employee based on the accrual level and added value. """ today_date = datetime.today().date() number_of_days = level.added_value if employee.doj and ((employee.doj + timedelta(days=2)) == today_date): if level.level_frequency == 'monthly': if employee.doj.day <= 10: number_of_days = level.added_value else: number_of_days = level.added_value/2 elif level.level_frequency == 'yearly': start_month = int(level.yearly_month) joining_month = employee.doj.month # Compute remaining months in the allocation cycle remaining_months = (start_month - joining_month) % 12 or 12 # Calculate proportional leaves number_of_days = (level.added_value / 12) * remaining_months if employee.doj.day > 10: number_of_days = number_of_days - ((level.added_value / 12)/2) self.env['hr.leave.allocation'].sudo().create({ 'employee_id': employee.id, 'holiday_status_id': accrual.time_off_type_id.id, 'date_from': fields.Date.today(), 'number_of_days': number_of_days, 'allocation_type': 'auto_allocation' }).action_approve() def _is_accrual_applicable(self, employee, accrual): """ Helper method to check if the accrual is applicable to the employee based on joining date and the accrual plan. """ if employee.doj: doj = employee.doj start_date = False # Calculate the start date based on the accrual type and count if accrual.accrual_start_type == 'day': start_date = doj + relativedelta(days=accrual.accrual_start_count) elif accrual.accrual_start_type == 'month': start_date = doj + relativedelta(months=accrual.accrual_start_count) elif accrual.accrual_start_type == 'year': start_date = doj + relativedelta(years=accrual.accrual_start_count) # Return if the employee's today date is greater than or equal to start_date return fields.date.today() >= start_date return False def _emp_filter_by_level(self, employee, level): """ Helper method to check if the employee's total experience (including previous experience) is within the range defined by the accrual's start and max start counts and types. """ if employee.doj: # Calculate the employee's total experience based on DOJ and previous_exp total_experience = self._calculate_total_experience(employee.doj, employee.previous_exp) # Calculate the minimum experience based on accrual_start_count and accrual_start_type min_experience = self._calculate_experience_threshold(level.start_count, level.start_type) # Calculate the maximum experience based on max_start_count and max_start_type max_experience = self._calculate_experience_threshold(level.max_start_count, level.max_start_type) # Return whether the total experience is within the min and max range return min_experience <= total_experience <= max_experience return False def _calculate_total_experience(self, doj, previous_exp): """ Helper method to calculate total experience of an employee in months, combining the previous experience and experience since the DOJ. """ today = datetime.today() # Calculate current experience from the date of joining delta = relativedelta(today, doj) current_experience_months = delta.years * 12 + delta.months # Add previous experience (already in months) total_experience_months = current_experience_months + previous_exp return total_experience_months def _calculate_experience_threshold(self, count, experience_type): """ Helper method to calculate the experience threshold in months based on a given count and type. """ today = datetime.today() start_date = today if experience_type == 'day': start_date = today - relativedelta(days=count) elif experience_type == 'month': start_date = today - relativedelta(months=count) elif experience_type == 'year': start_date = today - relativedelta(years=count) # Calculate the experience in months for the threshold return self._calculate_total_experience(start_date, 0) class HRLeave(models.Model): _inherit = 'hr.leave' state = fields.Selection([ ('draft', 'Draft'), ('confirm', 'To Approve'), ('refuse', 'Refused'), ('validate1', 'Second Approval'), ('validate', 'Approved'), ('cancel', 'Cancelled'), ], string='Status', store=True, tracking=True, copy=False, readonly=False, default='draft', help="The status is set to 'Draft' when the leave request is created." + "\nThe status is 'To Approve', when time off request is confirmed by user." + "\nThe status is 'Refused', when time off request is refused by manager." + "\nThe status is 'Second Approval', when time off request is awaiting further validation." + "\nThe status is 'Approved', when time off request is approved by manager." + "\nThe status is 'Cancelled', when time off request is cancelled.") submitted_date = fields.Datetime(string="Submit Date") def write(self, values): is_officer = self.env.user.has_group('hr_holidays.group_hr_holidays_user') or self.env.is_superuser() if not is_officer and values.keys() - {'attachment_ids', 'supported_attachment_ids', 'message_main_attachment_id'}: # if any(hol.date_from.date() < fields.Date.today() and hol.employee_id.leave_manager_id != self.env.user for hol in self): # raise UserError(_('You must have manager rights to modify/validate a time off that already begun')) if any(leave.state == 'cancel' for leave in self): # raise UserError(_('Only a manager can modify a canceled leave.')) pass # Unlink existing resource.calendar.leaves for validated time off if 'state' in values and values['state'] != 'validate': validated_leaves = self.filtered(lambda l: l.state == 'validate') validated_leaves._remove_resource_leave() employee_id = values.get('employee_id', False) if not self.env.context.get('leave_fast_create'): if values.get('state'): self._check_approval_update(values['state']) if any(holiday.validation_type == 'both' for holiday in self): if values.get('employee_id'): employees = self.env['hr.employee'].browse(values.get('employee_id')) else: employees = self.mapped('employee_id') self._check_double_validation_rules(employees, values['state']) if 'date_from' in values: values['request_date_from'] = values['date_from'] if 'date_to' in values: values['request_date_to'] = values['date_to'] result = super(HolidaysRequest, self).write(values) if any(field in values for field in ['request_date_from', 'date_from', 'request_date_from', 'date_to', 'holiday_status_id', 'employee_id', 'state']): self._check_validity() if not self.env.context.get('leave_fast_create'): for holiday in self: if employee_id: holiday.add_follower(employee_id) return result def _check_validity(self): for rec in self: if rec.holiday_status_id.limit_leave_requests: if rec.holiday_status_id.limit_request_type and rec.holiday_status_id.limit_emp_type and rec.holiday_status_id.limit_request_count >= 0: if rec.employee_id.sudo().emp_type and rec.employee_id.sudo().emp_type.id in rec.holiday_status_id.limit_emp_type.ids: time_frame = { 'week': timedelta(weeks=1), 'month': timedelta(days=30), 'year': timedelta(days=365), }.get(rec.holiday_status_id.limit_request_type, timedelta(weeks=1)) # Default to 1 week restriction_start_date = datetime.now() - time_frame # Count the leave requests made by the employee within the restriction period leave_count = self.env['hr.leave'].sudo().search_count([ ('employee_id', '=', rec.employee_id.id), ('state', 'not in', ['cancel', 'refuse', 'draft']), # Adjust states if needed ('holiday_status_id', '=', rec.holiday_status_id.id), ('request_date_from', '>=', restriction_start_date), ('id','!=',rec.id) ]) if leave_count >= rec.holiday_status_id.limit_request_count: if rec.holiday_status_id.limit_request_count == 0: raise ValidationError(_("This time off type is not applicable for %s employee. Please contact your admin if required"%(rec.employee_id.sudo().emp_type.name))) else: raise ValidationError(_( "You have exceeded the maximum allowed leave requests (%s) for the selected period (%s)." ) % (rec.holiday_status_id.limit_request_count, rec.holiday_status_id.limit_request_type)) return super(HRLeave, self)._check_validity() def action_draft(self): for rec in self: is_officer = self.env.user.has_group('hr_holidays.group_hr_holidays_user') or self.env.is_superuser() if rec.sudo().employee_id.user_id.id != self.env.user.id and not is_officer: raise ValidationError(_("Only employee can submit his own leave")) rec.submitted_date = fields.Datetime.now() self._check_validity() rec.state = 'confirm' action = self.sudo().env.ref('hr_holidays.hr_leave_action_my') return { 'type': 'ir.actions.act_window', 'name': action.name, 'res_model': action.res_model, 'view_mode': action.view_mode, 'target': 'current', } def action_reset_confirm(self): if any(holiday.state not in ['cancel', 'refuse'] for holiday in self): raise UserError(_('Time off request state must be "Refused" or "Cancelled" in order to be reset to "Confirmed".')) self.write({ 'state': 'draft', 'first_approver_id': False, 'second_approver_id': False, }) self.activity_update() return True def action_approve(self): for rec in self: if rec.employee_id.leave_manager_id.id != self.env.user.id : raise ValidationError(_("Only Employees Time Off Manager can approve this Leave request")) return super(HRLeave, self).action_approve() def action_refuse(self): for rec in self: if (rec.employee_id.leave_manager_id.id != self.env.user.id) and (self.env.user.id not in rec.holiday_status_id.responsible_ids.ids): raise ValidationError(_("only Employee / Leave type Time off Manager's can refuse this Leave request")) return super(HRLeave, self).action_refuse() def action_validate(self, check_state=True): current_employee = self.env.user.employee_id for holiday in self: if check_state and holiday.state in ['validate1'] and holiday.validation_type == 'both' and (holiday.holiday_status_id.responsible_ids and (current_employee.user_id.id not in holiday.holiday_status_id.responsible_ids.ids)): raise UserError(_('Only Timeoff officers for the %s can validate this leave'%(holiday.holiday_status_id.name))) return super(HRLeave, self).action_validate(check_state) @api.depends_context('uid') @api.depends('state', 'employee_id') def _compute_can_cancel(self): for leave in self: leave.can_cancel = leave.id and leave.employee_id.user_id == self.env.user and leave.state in ['confirm'] @api.ondelete(at_uninstall=False) def _unlink_if_correct_states(self): error_message = _('You cannot delete a time off which is in %s state') state_description_values = {elem[0]: elem[1] for elem in self._fields['state']._description_selection(self.env)} now = fields.Datetime.now().date() if not self.env.user.has_group('hr_holidays.group_hr_holidays_user'): for hol in self: if hol.state not in ['draft', 'cancel']: raise UserError(error_message % state_description_values.get(self[:1].state)) else: for holiday in self.filtered(lambda holiday: holiday.state not in ['cancel', 'draft']): raise UserError(error_message % (state_description_values.get(holiday.state),)) def _check_approval_update(self, state): """ Check if target state is achievable. """ if self.env.is_superuser(): return current_employee = self.env.user.employee_id is_officer = self.env.user.has_group('hr_holidays.group_hr_holidays_user') is_manager = self.env.user.has_group('hr_holidays.group_hr_holidays_manager') for holiday in self: val_type = holiday.validation_type if not is_manager: if holiday.state == 'cancel' and state != 'confirm': raise UserError(_('A cancelled leave cannot be modified.')) if state == 'confirm': if holiday.state == 'refuse': raise UserError(_('Only a Time Off Manager can reset a refused leave.')) # if holiday.date_from and holiday.date_from.date() <= fields.Date.today(): # raise UserError(_('Only a Time Off Manager can reset a started leave.')) if holiday.employee_id != current_employee: raise UserError(_('Only a Time Off Manager can reset other people leaves.')) else: if val_type == 'no_validation' and current_employee == holiday.employee_id and (is_officer or is_manager): continue # use ir.rule based first access check: department, members, ... (see security.xml) holiday.check_access('write') # This handles states validate1 validate and refuse if holiday.employee_id == current_employee\ and self.env.user != holiday.employee_id.leave_manager_id\ and not is_officer: raise UserError(_('Only a Time Off Officer or Manager can approve/refuse its own requests.')) if (state == 'validate1' and val_type == 'both'): if not is_officer and self.env.user != holiday.employee_id.leave_manager_id: raise UserError(_('You must be either %s\'s manager or Time off Manager to approve this leave') % (holiday.employee_id.name)) if (state == 'validate' and val_type == 'manager')\ and self.env.user != holiday.employee_id.leave_manager_id\ and not is_officer: raise UserError(_("You must be %s's Manager to approve this leave", holiday.employee_id.name)) if not is_officer and (state == 'validate' and val_type == 'hr'): raise UserError(_('You must either be a Time off Officer or Time off Manager to approve this leave')) HolidaysRequest.write = HRLeave.write class HRLeaveType(models.Model): _inherit='hr.leave.type' request_unit_type = fields.Selection([ ('day', 'Day'), ('half_day', 'Half Day')], default='day', string='Take Time Off in', required=True) request_unit = fields.Selection(related="request_unit_type",store=True) limit_leave_requests = fields.Boolean(string='Limit Leave Requests', default=False) limit_request_count = fields.Integer( "limit Count", help="Defines the minimum number of leave requests after which the restriction will apply. For example, set 1 to start restrictions after the first request.", default="1") limit_request_type = fields.Selection( [('week', 'Week'), ('month', 'Month'), ('year', 'Year')], default='month', string="Limit Type", required=True, help="Specifies the type of time period (days, months, or years) for applying the leave request") limit_emp_type = fields.Many2many('hr.contract.type', string="Employee Type")