odoo18/addons_extensions/hr_recruitment_dashboards/models/recruitment_dashboard.py

384 lines
18 KiB
Python

from collections import defaultdict
from datetime import datetime, time
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, _
class HrRecruitmentDashboard(models.AbstractModel):
_name = 'hr.recruitment.dashboard'
_description = 'HR Recruitment Dashboard'
@api.model
def get_dashboard_data(self, filters=None):
filters = filters or {}
applicant_domain = self._get_applicant_domain(filters)
job_domain = self._get_job_domain(filters)
Applicant = self.env['hr.applicant'].with_context(active_test=False)
JobRecruitment = self.env['hr.job.recruitment'].with_context(active_test=False)
applicants = Applicant.search(applicant_domain)
jobs = JobRecruitment.search(job_domain)
active_applicants = applicants.filtered('active')
submitted_applicants = applicants.filtered(lambda item: item.submitted_to_client)
hired_applicants = applicants.filtered(lambda item: item.recruitment_stage_id.hired_stage)
refused_applicants = applicants.filtered(lambda item: not item.active or item.refuse_reason_id)
on_hold_applicants = applicants.filtered(lambda item: item.is_on_hold)
return {
'filters': self._get_filter_options(),
'summary': self._get_summary_cards(jobs, applicants, active_applicants, submitted_applicants, hired_applicants, refused_applicants, on_hold_applicants),
'charts': {
'pipeline': self._get_pipeline_chart(applicants, applicant_domain),
'status': self._get_status_chart(jobs, job_domain),
'recruiters': self._get_recruiter_chart(applicants, applicant_domain),
'funnel': self._get_funnel_chart(applicants, submitted_applicants, hired_applicants, refused_applicants, applicant_domain),
'priority': self._get_priority_chart(jobs, job_domain),
'trend': self._get_trend_chart(applicant_domain, filters),
'skill_match': self._get_skill_match_chart(applicants, applicant_domain),
},
'tables': {
'top_jobs': self._get_top_jobs(jobs),
'attention': self._get_attention_items(jobs, on_hold_applicants),
},
}
@api.model
def get_drilldown_action(self, model_name, domain, title=False, view_mode=False):
return {
'type': 'ir.actions.act_window',
'name': title or _('Recruitment Records'),
'res_model': model_name,
'domain': domain or [],
'view_mode': view_mode or ('kanban,list,form' if model_name == 'hr.applicant' else 'list,form'),
'views': self._get_action_views(model_name),
'target': 'current',
'context': {'active_test': False},
}
@api.model
def _get_action_views(self, model_name):
if model_name == 'hr.applicant':
return [(False, 'kanban'), (False, 'list'), (False, 'form')]
if model_name == 'hr.job.recruitment':
return [(False, 'kanban'), (False, 'list'), (False, 'form')]
return [(False, 'list'), (False, 'form')]
@api.model
def _get_filter_options(self):
recruiters = self.env['res.users'].search([
('share', '=', False),
('active', '=', True),
], order='name')
jobs = self.env['hr.job.recruitment'].with_context(active_test=False).search([], order='recruitment_sequence desc, id desc', limit=200)
departments = self.env['hr.department'].search([], order='name')
return {
'recruiters': [{'id': item.id, 'name': item.name} for item in recruiters],
'jobs': [{'id': item.id, 'name': item.display_name} for item in jobs],
'departments': [{'id': item.id, 'name': item.display_name} for item in departments],
'recruitment_types': [
{'id': 'internal', 'name': _('In-House')},
{'id': 'external', 'name': _('Client-Side')},
],
'statuses': self._selection_options('hr.job.recruitment', 'recruitment_status'),
'priorities': self._selection_options('hr.job.recruitment', 'job_priority'),
'ranges': [
{'id': '30', 'name': _('Last 30 Days')},
{'id': '90', 'name': _('Last 90 Days')},
{'id': '180', 'name': _('Last 180 Days')},
{'id': '365', 'name': _('Last 12 Months')},
{'id': 'custom', 'name': _('Custom')},
],
'published_statuses': [
{'id': 'all', 'name': _('All')},
{'id': 'published', 'name': _('Published')},
{'id': 'unpublished', 'name': _('Unpublished')},
],
}
@api.model
def _selection_options(self, model_name, field_name):
field = self.env[model_name]._fields[field_name]
return [{'id': value, 'name': label} for value, label in field.selection]
@api.model
def _get_date_values(self, filters):
today = fields.Date.context_today(self)
range_key = filters.get('range') or '90'
if filters.get('date_from') or filters.get('date_to'):
range_key = 'custom'
if range_key != 'custom':
date_to = today
date_from = today - relativedelta(days=int(range_key))
else:
date_from = fields.Date.to_date(filters.get('date_from')) if filters.get('date_from') else today - relativedelta(days=90)
date_to = fields.Date.to_date(filters.get('date_to')) if filters.get('date_to') else today
date_from_dt = datetime.combine(date_from, time.min)
date_to_dt = datetime.combine(date_to, time.max)
return date_from, date_to, fields.Datetime.to_string(date_from_dt), fields.Datetime.to_string(date_to_dt)
@api.model
def _get_applicant_domain(self, filters):
date_from, date_to, date_from_dt, date_to_dt = self._get_date_values(filters)
domain = [
('create_date', '>=', date_from_dt),
('create_date', '<=', date_to_dt),
'|',
('company_id', '=', False),
('company_id', 'in', self.env.companies.ids),
]
domain += self._job_related_domain(filters, applicant=True)
return domain
@api.model
def _get_job_domain(self, filters):
date_from, date_to, date_from_dt, date_to_dt = self._get_date_values(filters)
domain = [
'|',
('company_id', '=', False),
('company_id', 'in', self.env.companies.ids),
'|',
('target_from', '=', False),
('target_from', '<=', date_to),
'|',
('target_to', '=', False),
('target_to', '>=', date_from),
]
domain += self._job_related_domain(filters, applicant=False)
return domain
@api.model
def _job_related_domain(self, filters, applicant=False):
prefix = 'hr_job_recruitment.' if applicant else ''
domain = []
if filters.get('recruiter_ids'):
domain.append((prefix + 'user_id', 'in', filters['recruiter_ids']))
if filters.get('job_ids'):
domain.append(('hr_job_recruitment' if applicant else 'id', 'in', filters['job_ids']))
if filters.get('department_ids'):
domain.append((prefix + 'department_id', 'in', filters['department_ids']))
if filters.get('recruitment_types'):
domain.append((prefix + 'recruitment_type', 'in', filters['recruitment_types']))
if filters.get('statuses'):
domain.append((prefix + 'recruitment_status', 'in', filters['statuses']))
if filters.get('priorities'):
domain.append((prefix + 'job_priority', 'in', filters['priorities']))
if filters.get('published_status') == 'published':
domain.append((prefix + 'website_published', '=', True))
elif filters.get('published_status') == 'unpublished':
domain.append((prefix + 'website_published', '=', False))
return domain
@api.model
def _get_summary_cards(self, jobs, applicants, active_applicants, submitted_applicants, hired_applicants, refused_applicants, on_hold_applicants):
open_jobs = jobs.filtered(lambda item: item.recruitment_status == 'open')
total_positions = sum(jobs.mapped('no_of_recruitment'))
avg_skill = round(sum(applicants.mapped('overall_skill_match_percentage')) / len(applicants), 1) if applicants else 0
conversion = round((len(hired_applicants) / len(applicants)) * 100, 1) if applicants else 0
return [
self._card(_('Open Recruitments'), len(open_jobs), 'hr.job.recruitment', [('id', 'in', open_jobs.ids)], 'fa-briefcase', '#2563eb'),
self._card(_('Open Positions'), total_positions, 'hr.job.recruitment', [('id', 'in', jobs.ids)], 'fa-users', '#059669'),
self._card(_('Applications'), len(active_applicants), 'hr.applicant', [('id', 'in', active_applicants.ids)], 'fa-id-card-o', '#7c3aed'),
self._card(_('Submitted to Client'), len(submitted_applicants), 'hr.applicant', [('id', 'in', submitted_applicants.ids)], 'fa-paper-plane', '#ea580c'),
self._card(_('Hired'), len(hired_applicants), 'hr.applicant', [('id', 'in', hired_applicants.ids)], 'fa-check-circle', '#16a34a'),
self._card(_('On Hold'), len(on_hold_applicants), 'hr.applicant', [('id', 'in', on_hold_applicants.ids)], 'fa-pause-circle', '#ca8a04'),
self._card(_('Avg Skill Match'), '%s%%' % avg_skill, 'hr.applicant', [('id', 'in', applicants.ids)], 'fa-bullseye', '#0891b2'),
self._card(_('Hire Conversion'), '%s%%' % conversion, 'hr.applicant', [('id', 'in', hired_applicants.ids)], 'fa-line-chart', '#db2777'),
]
@api.model
def _card(self, title, value, model_name, domain, icon, color):
return {
'title': title,
'value': value,
'model': model_name,
'domain': domain,
'icon': icon,
'color': color,
}
@api.model
def _get_pipeline_chart(self, applicants, base_domain):
grouped = defaultdict(lambda: {'count': 0, 'ids': []})
for applicant in applicants:
stage = applicant.recruitment_stage_id
key = stage.display_name if stage else _('No Stage')
grouped[key]['count'] += 1
grouped[key]['ids'].append(applicant.id)
labels = list(grouped)
return {
'labels': labels,
'series': [grouped[label]['count'] for label in labels],
'points': [{
'model': 'hr.applicant',
'title': _('Applications: %s') % label,
'domain': [('id', 'in', grouped[label]['ids'])],
} for label in labels],
}
@api.model
def _get_status_chart(self, jobs, base_domain):
labels_by_value = dict(self.env['hr.job.recruitment']._fields['recruitment_status'].selection)
grouped = defaultdict(lambda: {'count': 0, 'ids': []})
for job in jobs:
key = job.recruitment_status or 'open'
grouped[key]['count'] += 1
grouped[key]['ids'].append(job.id)
labels = list(grouped)
return {
'labels': [labels_by_value.get(label, label) for label in labels],
'series': [grouped[label]['count'] for label in labels],
'points': [{
'model': 'hr.job.recruitment',
'title': _('Recruitments: %s') % labels_by_value.get(label, label),
'domain': [('id', 'in', grouped[label]['ids'])],
} for label in labels],
}
@api.model
def _get_recruiter_chart(self, applicants, base_domain):
grouped = defaultdict(lambda: {'count': 0, 'ids': []})
for applicant in applicants:
recruiter = applicant.hr_job_recruitment.user_id or applicant.user_id
key = recruiter.name if recruiter else _('Unassigned')
grouped[key]['count'] += 1
grouped[key]['ids'].append(applicant.id)
sorted_items = sorted(grouped.items(), key=lambda item: item[1]['count'], reverse=True)[:10]
return {
'labels': [item[0] for item in sorted_items],
'series': [item[1]['count'] for item in sorted_items],
'points': [{
'model': 'hr.applicant',
'title': _('Applications: %s') % item[0],
'domain': [('id', 'in', item[1]['ids'])],
} for item in sorted_items],
}
@api.model
def _get_funnel_chart(self, applicants, submitted_applicants, hired_applicants, refused_applicants, base_domain):
stages = [
(_('Applications'), applicants.ids),
(_('Client Submitted'), submitted_applicants.ids),
(_('Hired'), hired_applicants.ids),
(_('Refused / Archived'), refused_applicants.ids),
]
return {
'labels': [stage[0] for stage in stages],
'series': [len(stage[1]) for stage in stages],
'points': [{
'model': 'hr.applicant',
'title': stage[0],
'domain': [('id', 'in', stage[1])],
} for stage in stages],
}
@api.model
def _get_priority_chart(self, jobs, base_domain):
labels_by_value = dict(self.env['hr.job.recruitment']._fields['job_priority'].selection)
grouped = defaultdict(lambda: {'count': 0, 'ids': []})
for job in jobs:
key = job.job_priority or 'medium'
grouped[key]['count'] += 1
grouped[key]['ids'].append(job.id)
order = ['high', 'medium', 'low']
labels = [item for item in order if item in grouped] + [item for item in grouped if item not in order]
return {
'labels': [labels_by_value.get(label, label.title()) for label in labels],
'series': [grouped[label]['count'] for label in labels],
'points': [{
'model': 'hr.job.recruitment',
'title': _('Priority: %s') % labels_by_value.get(label, label.title()),
'domain': [('id', 'in', grouped[label]['ids'])],
} for label in labels],
}
@api.model
def _get_skill_match_chart(self, applicants, base_domain):
buckets = [
('80-100%', lambda value: value >= 80),
('60-79%', lambda value: 60 <= value < 80),
('40-59%', lambda value: 40 <= value < 60),
('0-39%', lambda value: value < 40),
]
values = []
points = []
for label, predicate in buckets:
records = applicants.filtered(lambda item: predicate(item.overall_skill_match_percentage or 0))
values.append(len(records))
points.append({
'model': 'hr.applicant',
'title': _('Skill Match %s') % label,
'domain': [('id', 'in', records.ids)],
})
return {'labels': [item[0] for item in buckets], 'series': values, 'points': points}
@api.model
def _get_trend_chart(self, base_domain, filters):
Applicant = self.env['hr.applicant'].with_context(active_test=False)
date_from, date_to, date_from_dt, date_to_dt = self._get_date_values(filters)
current = date_from.replace(day=1)
labels = []
applications = []
submissions = []
hires = []
points = []
while current <= date_to:
month_start = datetime.combine(current, time.min)
month_end_date = current + relativedelta(months=1, days=-1)
month_end = datetime.combine(month_end_date, time.max)
month_domain = [
('create_date', '>=', fields.Datetime.to_string(month_start)),
('create_date', '<=', fields.Datetime.to_string(month_end)),
] + [item for item in base_domain if item[0] not in {'create_date'}]
month_records = Applicant.search(month_domain)
submitted = month_records.filtered(lambda item: item.submitted_to_client)
hired = month_records.filtered(lambda item: item.recruitment_stage_id.hired_stage)
labels.append(current.strftime('%b %Y'))
applications.append(len(month_records))
submissions.append(len(submitted))
hires.append(len(hired))
points.append({
'model': 'hr.applicant',
'title': _('Applications: %s') % current.strftime('%b %Y'),
'domain': [('id', 'in', month_records.ids)],
})
current += relativedelta(months=1)
return {
'labels': labels,
'series': [
{'name': _('Applications'), 'data': applications},
{'name': _('Submitted'), 'data': submissions},
{'name': _('Hired'), 'data': hires},
],
'points': points,
}
@api.model
def _get_top_jobs(self, jobs):
ranked = jobs.sorted(lambda item: item.application_count, reverse=True)[:10]
return [{
'id': job.id,
'name': job.display_name,
'recruiter': job.user_id.name or '',
'status': dict(job._fields['recruitment_status'].selection).get(job.recruitment_status, ''),
'priority': dict(job._fields['job_priority'].selection).get(job.job_priority, ''),
'positions': job.no_of_recruitment,
'applications': job.application_count,
'submissions': job.no_of_submissions,
'hired': job.no_of_hired_employee,
'domain': [('id', '=', job.id)],
} for job in ranked]
@api.model
def _get_attention_items(self, jobs, on_hold_applicants):
overdue_jobs = jobs.filtered(lambda item: item.target_to and item.target_to < fields.Date.context_today(self) and item.recruitment_status == 'open')
high_priority_jobs = jobs.filtered(lambda item: item.job_priority == 'high' and item.recruitment_status == 'open')
return [
self._card(_('Overdue Recruitments'), len(overdue_jobs), 'hr.job.recruitment', [('id', 'in', overdue_jobs.ids)], 'fa-calendar-times-o', '#dc2626'),
self._card(_('High Priority Openings'), len(high_priority_jobs), 'hr.job.recruitment', [('id', 'in', high_priority_jobs.ids)], 'fa-fire', '#ea580c'),
self._card(_('Candidates On Hold'), len(on_hold_applicants), 'hr.applicant', [('id', 'in', on_hold_applicants.ids)], 'fa-pause', '#ca8a04'),
]