400 lines
19 KiB
Python
400 lines
19 KiB
Python
from odoo import models, fields, api
|
|
from odoo.exceptions import ValidationError, UserError
|
|
from datetime import datetime, timedelta
|
|
import datetime as dt
|
|
from odoo import _
|
|
from calendar import month_name, month
|
|
from datetime import date
|
|
|
|
class CwfTimesheetYearly(models.Model):
|
|
_name = 'cwf.timesheet.calendar'
|
|
_description = "CWF Timesheet Calendar"
|
|
_rec_name = 'name'
|
|
|
|
name = fields.Char(string='Year Name', required=True)
|
|
week_period = fields.One2many('cwf.timesheet','cwf_calendar_id')
|
|
|
|
_sql_constraints = [
|
|
('unique_year', 'unique(name)', 'The year must be unique!')
|
|
]
|
|
|
|
|
|
@api.constrains('name')
|
|
def _check_year_format(self):
|
|
for record in self:
|
|
if not record.name.isdigit() or len(record.name) != 4:
|
|
raise ValidationError("Year Name must be a 4-digit number.")
|
|
|
|
def generate_week_period(self):
|
|
for record in self:
|
|
record.week_period.unlink()
|
|
year = int(record.name)
|
|
|
|
# Find the first Monday of the year
|
|
start_date = datetime(year, 1, 1)
|
|
while start_date.weekday() != 0: # Monday is 0 in weekday()
|
|
start_date += timedelta(days=1)
|
|
|
|
# Generate weeks from Monday to Sunday
|
|
while start_date.year == year or (start_date - timedelta(days=1)).year == year:
|
|
end_date = start_date + timedelta(days=6)
|
|
|
|
self.env['cwf.timesheet'].create({
|
|
'name': f'Week {start_date.strftime("%W")}, {year}',
|
|
'week_start_date': start_date.date(),
|
|
'week_end_date': end_date.date(),
|
|
'cwf_calendar_id': record.id,
|
|
})
|
|
|
|
start_date += timedelta(days=7)
|
|
|
|
def action_generate_weeks(self):
|
|
self.generate_week_period()
|
|
return {
|
|
'type': 'ir.actions.client',
|
|
'tag': 'reload',
|
|
}
|
|
|
|
class CwfTimesheet(models.Model):
|
|
_name = 'cwf.timesheet'
|
|
_description = 'CWF Weekly Timesheet'
|
|
_rec_name = 'name'
|
|
|
|
name = fields.Char(string='Week Name', required=True)
|
|
week_start_date = fields.Date(string='Week Start Date', required=True)
|
|
week_end_date = fields.Date(string='Week End Date', required=True)
|
|
status = fields.Selection([
|
|
('draft', 'Draft'),
|
|
('submitted', 'Submitted')
|
|
], default='draft', string='Status')
|
|
lines = fields.One2many('cwf.timesheet.line','week_id')
|
|
cwf_calendar_id = fields.Many2one('cwf.timesheet.calendar')
|
|
start_month = fields.Selection(
|
|
[(str(i), month_name[i]) for i in range(1, 13)],
|
|
string='Start Month',
|
|
compute='_compute_months',
|
|
store=True
|
|
)
|
|
|
|
end_month = fields.Selection(
|
|
[(str(i), month_name[i]) for i in range(1, 13)],
|
|
string='End Month',
|
|
compute='_compute_months',
|
|
store=True
|
|
)
|
|
|
|
@api.depends('week_start_date', 'week_end_date')
|
|
def _compute_months(self):
|
|
for rec in self:
|
|
if rec.week_start_date:
|
|
rec.start_month = str(rec.week_start_date.month)
|
|
else:
|
|
rec.start_month = False
|
|
|
|
if rec.week_end_date:
|
|
rec.end_month = str(rec.week_end_date.month)
|
|
else:
|
|
rec.end_month = False
|
|
|
|
@api.depends('name','week_start_date','week_end_date')
|
|
def _compute_display_name(self):
|
|
for rec in self:
|
|
rec.display_name = rec.name if not rec.week_start_date and rec.week_end_date else "%s (%s - %s)"%(rec.name,rec.week_start_date.strftime('%-d %b'), rec.week_end_date.strftime('%-d %b') )
|
|
|
|
|
|
def send_timesheet_update_email(self):
|
|
template = self.env.ref('cwf_timesheet.email_template_timesheet_weekly_update')
|
|
# Ensure that we have a valid employee email
|
|
current_date = fields.Date.from_string(self.week_start_date)
|
|
end_date = fields.Date.from_string(self.week_end_date)
|
|
|
|
if current_date > end_date:
|
|
raise UserError('The start date cannot be after the end date.')
|
|
|
|
# Get all employees in the department
|
|
external_group_id = self.env.ref("hr_employee_extended.group_external_user")
|
|
users = self.env["res.users"].search([("groups_id", "=", external_group_id.id)])
|
|
employees = self.env['hr.employee'].search([('user_id', 'in', users.ids),'|',('doj','=',False),('doj','>=', self.week_start_date)])
|
|
print(employees)
|
|
# Loop through each day of the week and create timesheet lines for each employee
|
|
while current_date <= end_date:
|
|
for employee in employees:
|
|
existing_record = self.env['cwf.weekly.timesheet'].sudo().search([
|
|
('week_id', '=', self.id),
|
|
('employee_id', '=', employee.id)
|
|
], limit=1)
|
|
if not existing_record:
|
|
self.env['cwf.timesheet.line'].sudo().create({
|
|
'week_id': self.id,
|
|
'employee_id': employee.id,
|
|
'week_day':current_date,
|
|
})
|
|
current_date += timedelta(days=1)
|
|
self.status = 'submitted'
|
|
for employee in employees:
|
|
weekly_timesheet_exists = self.env['cwf.weekly.timesheet'].sudo().search([('week_id','=',self.id),('employee_id','=',employee.id)])
|
|
|
|
if not weekly_timesheet_exists:
|
|
weekly_timesheet = self.env['cwf.weekly.timesheet'].sudo().create({
|
|
'week_id': self.id,
|
|
'employee_id': employee.id,
|
|
'status': 'draft'
|
|
})
|
|
else:
|
|
weekly_timesheet = weekly_timesheet_exists
|
|
|
|
# Generate the URL for the newly created weekly_timesheet
|
|
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
|
|
record_url = f"{base_url}/web#id={weekly_timesheet.id}&view_type=form&model=cwf.weekly.timesheet"
|
|
|
|
weekly_timesheet.update_attendance()
|
|
if employee.work_email and weekly_timesheet:
|
|
email_values = {
|
|
'email_to': employee.work_email, # Email body from template
|
|
'subject': 'Timesheet Update Notification',
|
|
}
|
|
body_html = template.body_html.replace(
|
|
'https://ftprotech.in/odoo/action-261',
|
|
record_url
|
|
),
|
|
render_ctx = {'employee_name':weekly_timesheet.employee_id.name,'week_from':weekly_timesheet.week_id.week_start_date,'week_to':weekly_timesheet.week_id.week_end_date}
|
|
|
|
|
|
template.with_context(default_body_html=body_html,**render_ctx).send_mail(weekly_timesheet.id, email_values=email_values, force_send=True)
|
|
|
|
|
|
class CwfWeeklyTimesheet(models.Model):
|
|
_name = "cwf.weekly.timesheet"
|
|
_description = "CWF Weekly Timesheet"
|
|
_rec_name = 'employee_id'
|
|
|
|
|
|
def _default_week_id(self):
|
|
current_date = fields.Date.today()
|
|
timesheet = self.env['cwf.timesheet'].sudo().search([
|
|
('week_start_date', '<=', current_date),
|
|
('week_end_date', '>=', current_date)
|
|
], limit=1)
|
|
return timesheet.id if timesheet else False
|
|
|
|
def _get_week_id_domain(self):
|
|
for rec in self:
|
|
return [('week_start_date.month_number','=',2)]
|
|
pass
|
|
|
|
month_id = fields.Selection(
|
|
[(str(i), month_name[i]) for i in range(1, 13)],
|
|
string='Month'
|
|
)
|
|
week_id = fields.Many2one('cwf.timesheet', 'Week', default=lambda self: self._default_week_id())
|
|
|
|
employee_id = fields.Many2one('hr.employee', default=lambda self: self.env.user.employee_id.id)
|
|
cwf_timesheet_lines = fields.One2many('cwf.timesheet.line' ,'weekly_timesheet')
|
|
status = fields.Selection([('draft','Draft'),('submitted','Submitted')], default='draft')
|
|
week_start_date = fields.Date(related='week_id.week_start_date')
|
|
week_end_date = fields.Date(related='week_id.week_end_date')
|
|
|
|
@api.onchange('month_id')
|
|
def _onchange_month(self):
|
|
if self.month_id:
|
|
year = self.week_start_date.year if self.week_start_date else fields.Date.today().year
|
|
start = date(year, int(self.month_id), 1)
|
|
if int(self.month_id) == 12:
|
|
end = date(year + 1, 1, 1) - timedelta(days=1)
|
|
else:
|
|
end = date(year, int(self.month_id) + 1, 1) - timedelta(days=1)
|
|
self = self.with_context(month_start=start, month_end=end)
|
|
|
|
@api.onchange('week_id')
|
|
def _onchange_week_id(self):
|
|
if self.week_id and self.week_id.week_start_date:
|
|
self.month_id = str(self.week_id.week_start_date.month)
|
|
|
|
@api.constrains('week_id', 'employee_id')
|
|
def _check_unique_week_employee(self):
|
|
for record in self:
|
|
if record.week_id.week_start_date > fields.Date.today():
|
|
raise ValidationError(_("You Can't select future week period"))
|
|
# Search for existing records with the same week_id and employee_id
|
|
existing_record = self.env['cwf.weekly.timesheet'].search([
|
|
('week_id', '=', record.week_id.id),
|
|
('employee_id', '=', record.employee_id.id)
|
|
], limit=1)
|
|
|
|
# If an existing record is found and it's not the current record (in case of update), raise an error
|
|
if existing_record and existing_record.id != record.id:
|
|
raise ValidationError("A timesheet for this employee already exists for the selected week.")
|
|
|
|
def update_attendance(self):
|
|
for rec in self:
|
|
# Get the week start and end date
|
|
week_start_date = rec.week_id.week_start_date
|
|
week_end_date = rec.week_id.week_end_date
|
|
|
|
# Convert start and end dates to datetime objects for proper filtering
|
|
week_start_datetime = datetime.combine(week_start_date, datetime.min.time())
|
|
week_end_datetime = datetime.combine(week_end_date, datetime.max.time())
|
|
|
|
# Delete timesheet lines that are outside the week range
|
|
rec.cwf_timesheet_lines.filtered(lambda line:
|
|
line.week_day < week_start_date or line.week_day > week_end_date
|
|
).unlink()
|
|
|
|
# Search for attendance records that fall within the week period and match the employee
|
|
hr_attendance_records = self.env['hr.attendance'].sudo().search([
|
|
('check_in', '>=', week_start_datetime),
|
|
('check_out', '<=', week_end_datetime),
|
|
('employee_id', '=', rec.employee_id.id)
|
|
])
|
|
|
|
# Group the attendance records by date
|
|
attendance_by_date = {}
|
|
for attendance in hr_attendance_records:
|
|
attendance_date = attendance.check_in.date()
|
|
if attendance_date not in attendance_by_date:
|
|
attendance_by_date[attendance_date] = []
|
|
attendance_by_date[attendance_date].append(attendance)
|
|
|
|
# Get all the dates within the week period
|
|
all_week_dates = [week_start_date + timedelta(days=i) for i in
|
|
range((week_end_date - week_start_date).days + 1)]
|
|
|
|
# Create or update timesheet lines for each day in the week
|
|
for date in all_week_dates:
|
|
# Check if there is attendance for this date
|
|
if date in attendance_by_date:
|
|
# If there are multiple attendance records, take the earliest check_in and latest check_out
|
|
earliest_check_in = min(attendance.check_in for attendance in attendance_by_date[date])
|
|
latest_check_out = max(attendance.check_out for attendance in attendance_by_date[date])
|
|
|
|
if (earliest_check_in + timedelta(hours=5, minutes=30)).date() > date:
|
|
earliest_check_in = (datetime.combine(date, datetime.max.time()) - timedelta(hours=5, minutes=30))
|
|
|
|
if (latest_check_out + timedelta(hours=5, minutes=30)).date() > date:
|
|
latest_check_out = (datetime.combine(date, datetime.max.time()) - timedelta(hours=5, minutes=30))
|
|
|
|
# Check if a timesheet line for this employee, week, and date already exists
|
|
existing_timesheet_line = self.env['cwf.timesheet.line'].sudo().search([
|
|
('week_day', '=', date),
|
|
('employee_id', '=', rec.employee_id.id),
|
|
('week_id', '=', rec.week_id.id),
|
|
('weekly_timesheet', '=', rec.id)
|
|
], limit=1)
|
|
|
|
if existing_timesheet_line:
|
|
# If it exists, update the existing record
|
|
existing_timesheet_line.write({
|
|
'check_in_date': earliest_check_in,
|
|
'check_out_date': latest_check_out,
|
|
'state_type': 'present',
|
|
})
|
|
else:
|
|
# If it doesn't exist, create a new timesheet line with present state_type
|
|
self.env['cwf.timesheet.line'].create({
|
|
'weekly_timesheet': rec.id,
|
|
'employee_id': rec.employee_id.id,
|
|
'week_id': rec.week_id.id,
|
|
'week_day': date,
|
|
'check_in_date': earliest_check_in,
|
|
'check_out_date': latest_check_out,
|
|
'state_type': 'present',
|
|
})
|
|
else:
|
|
# If no attendance exists for this date, create a new timesheet line with time_off state_type
|
|
existing_timesheet_line = self.env['cwf.timesheet.line'].sudo().search([
|
|
('week_day', '=', date),
|
|
('employee_id', '=', rec.employee_id.id),
|
|
('week_id', '=', rec.week_id.id),
|
|
('weekly_timesheet', '=', rec.id)
|
|
], limit=1)
|
|
|
|
if not existing_timesheet_line:
|
|
if date.weekday() != 5 and date.weekday() != 6:
|
|
# If no record exists for this date, create a new timesheet line with time_off state_type
|
|
self.env['cwf.timesheet.line'].create({
|
|
'weekly_timesheet': rec.id,
|
|
'employee_id': rec.employee_id.id,
|
|
'week_id': rec.week_id.id,
|
|
'week_day': date,
|
|
'state_type': 'time_off',
|
|
})
|
|
|
|
def action_submit(self):
|
|
for rec in self:
|
|
for timesheet in rec.cwf_timesheet_lines:
|
|
timesheet.action_submit()
|
|
rec.status = 'submitted'
|
|
|
|
class CwfTimesheetLine(models.Model):
|
|
_name = 'cwf.timesheet.line'
|
|
_description = 'CWF Weekly Timesheet Lines'
|
|
_rec_name = 'employee_id'
|
|
|
|
weekly_timesheet = fields.Many2one('cwf.weekly.timesheet')
|
|
employee_id = fields.Many2one('hr.employee', string='Employee', related='weekly_timesheet.employee_id')
|
|
week_id = fields.Many2one('cwf.timesheet', 'Week', related='weekly_timesheet.week_id')
|
|
week_day = fields.Date(string='Date')
|
|
check_in_date = fields.Datetime(string='Checkin')
|
|
check_out_date = fields.Datetime(string='Checkout ')
|
|
is_updated = fields.Boolean('Attendance Updated')
|
|
state_type = fields.Selection([('draft','Draft'),('holiday', 'Holiday'),('time_off','Time Off'),('half_day','Half Day'),('present','Present')], default='draft', required=True)
|
|
|
|
|
|
|
|
@api.constrains('week_day', 'check_in_date', 'check_out_date')
|
|
def _check_week_day_and_times(self):
|
|
for record in self:
|
|
# Ensure week_day is within the week range
|
|
if record.week_id:
|
|
if record.week_day < record.week_id.week_start_date or record.week_day > record.week_id.week_end_date:
|
|
raise ValidationError(
|
|
"The selected 'week_day' must be within the range of the week from %s to %s." %
|
|
(record.week_id.week_start_date, record.week_id.week_end_date)
|
|
)
|
|
|
|
# Ensure check_in_date and check_out_date are on the selected week_day
|
|
if record.check_in_date:
|
|
if record.check_in_date.date() != record.week_day:
|
|
raise ValidationError(
|
|
"The 'check_in_date' must be on the selected Date."
|
|
)
|
|
|
|
if record.check_out_date:
|
|
if record.check_out_date.date() != record.week_day:
|
|
raise ValidationError(
|
|
"The 'check_out_date' must be on the selected Date."
|
|
)
|
|
|
|
|
|
def action_submit(self):
|
|
if self.state_type == 'draft' or not self.state_type:
|
|
raise ValidationError(_('State type should not Draft or Empty'))
|
|
if self.state_type not in ['holiday','time_off'] and not (self.check_in_date or self.check_out_date):
|
|
raise ValidationError(_('Please enter Check details'))
|
|
self._update_attendance()
|
|
|
|
|
|
def _update_attendance(self):
|
|
attendance_obj = self.env['hr.attendance']
|
|
for record in self:
|
|
if record.check_in_date != False and record.check_out_date != False and record.employee_id:
|
|
first_check_in = attendance_obj.sudo().search([('check_in', '>=', record.check_in_date.date()),
|
|
('check_out', '<=', record.check_out_date.date()),('employee_id','=',record.employee_id.id)],
|
|
limit=1, order="check_in")
|
|
last_check_out = attendance_obj.sudo().search([('check_in', '>=', record.check_in_date.date()),
|
|
('check_out', '<=', record.check_out_date.date()),('employee_id','=',record.employee_id.id)],
|
|
limit=1, order="check_out desc")
|
|
|
|
if first_check_in or last_check_out:
|
|
if first_check_in.sudo().check_in != record.check_in_date:
|
|
first_check_in.sudo().check_in = record.check_in_date
|
|
if last_check_out.sudo().check_out != record.check_out_date:
|
|
last_check_out.sudo().check_out = record.check_out_date
|
|
else:
|
|
attendance_obj.sudo().create({
|
|
'employee_id': record.employee_id.id,
|
|
'check_in': record.check_in_date - timedelta(hours=5, minutes=30),
|
|
'check_out': record.check_out_date - timedelta(hours=5, minutes=30),
|
|
})
|
|
record.is_updated = True
|