506 lines
25 KiB
Python
506 lines
25 KiB
Python
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
import warnings
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
from operator import itemgetter
|
|
from werkzeug.urls import url_encode
|
|
|
|
from odoo import http, _
|
|
from odoo.addons.website_hr_recruitment.controllers.main import WebsiteHrRecruitment
|
|
from odoo.osv.expression import AND
|
|
from odoo.http import request
|
|
from odoo.tools import email_normalize
|
|
from odoo.tools.misc import groupby
|
|
import ast
|
|
import base64
|
|
|
|
|
|
|
|
class WebsiteJobHrRecruitment(WebsiteHrRecruitment):
|
|
_jobs_per_page = 12
|
|
|
|
def sitemap_jobs(env, rule, qs):
|
|
if not qs or qs.lower() in '/jobs':
|
|
yield {'loc': '/jobs'}
|
|
|
|
@http.route([
|
|
'/jobs',
|
|
'/jobs/page/<int:page>',
|
|
], type='http', auth="public", website=True, sitemap=sitemap_jobs)
|
|
def jobs(self, country_id=None, department_id=None, office_id=None, contract_type_id=None,
|
|
is_remote=False, is_other_department=False, is_untyped=None, page=1, search=None, **kwargs):
|
|
env = request.env(context=dict(request.env.context, show_address=True, no_tag_br=True))
|
|
|
|
Country = env['res.country']
|
|
Jobs = env['hr.job.recruitment']
|
|
Department = env['hr.department']
|
|
|
|
country = Country.browse(int(country_id)) if country_id else None
|
|
department = Department.browse(int(department_id)) if department_id else None
|
|
office_id = int(office_id) if office_id else None
|
|
contract_type_id = int(contract_type_id) if contract_type_id else None
|
|
|
|
# Default search by user country
|
|
if not (country or department or office_id or contract_type_id or kwargs.get('all_countries')):
|
|
if request.geoip.country_code:
|
|
countries_ = Country.search([('code', '=', request.geoip.country_code)])
|
|
country = countries_[0] if countries_ else None
|
|
if country:
|
|
country_count = Jobs.search_count(AND([
|
|
request.website.website_domain(),
|
|
[('address_id.country_id', '=', country.id)]
|
|
]))
|
|
if not country_count:
|
|
country = False
|
|
|
|
options = {
|
|
'displayDescription': True,
|
|
'allowFuzzy': not request.params.get('noFuzzy'),
|
|
'country_id': country.id if country else None,
|
|
'department_id': department.id if department else None,
|
|
'office_id': office_id,
|
|
'contract_type_id': contract_type_id,
|
|
'is_remote': is_remote,
|
|
'is_other_department': is_other_department,
|
|
'is_untyped': is_untyped,
|
|
}
|
|
total, details, fuzzy_search_term = request.website._search_with_fuzzy("job_requests", search,
|
|
limit=1000, order="is_published desc, sequence, no_of_recruitment desc", options=options)
|
|
# Browse jobs as superuser, because address is restricted
|
|
jobs = details[0].get('results', Jobs).sudo()
|
|
|
|
def sort(records_list, field_name):
|
|
""" Sort records in the given collection according to the given
|
|
field name, alphabetically. None values instead of records are
|
|
placed at the end.
|
|
|
|
:param list records_list: collection of records or None values
|
|
:param str field_name: field on which to sort
|
|
:return: sorted list
|
|
"""
|
|
return sorted(
|
|
records_list,
|
|
key=lambda item: (item is None, item.sudo()[field_name] if item and item.sudo()[field_name] else ''),
|
|
)
|
|
|
|
# Countries
|
|
if country or is_remote:
|
|
cross_country_options = options.copy()
|
|
cross_country_options.update({
|
|
'allowFuzzy': False,
|
|
'country_id': None,
|
|
'is_remote': False,
|
|
})
|
|
cross_country_total, cross_country_details, _ = request.website._search_with_fuzzy("jobs",
|
|
fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc",
|
|
options=cross_country_options)
|
|
# Browse jobs as superuser, because address is restricted
|
|
cross_country_jobs = cross_country_details[1].get('results', Jobs).sudo()
|
|
else:
|
|
cross_country_total = total
|
|
cross_country_jobs = jobs
|
|
country_offices = set(j.address_id or None for j in cross_country_jobs)
|
|
countries = sort(set(o and o.country_id or None for o in country_offices), 'name')
|
|
count_per_country = {'all': cross_country_total}
|
|
for c, jobs_list in groupby(cross_country_jobs, lambda job: job.address_id.country_id):
|
|
count_per_country[c] = len(jobs_list)
|
|
count_remote = len(cross_country_jobs.filtered(lambda job: not job.address_id))
|
|
if count_remote:
|
|
count_per_country[None] = count_remote
|
|
|
|
# Departments
|
|
if department or is_other_department:
|
|
cross_department_options = options.copy()
|
|
cross_department_options.update({
|
|
'allowFuzzy': False,
|
|
'department_id': None,
|
|
'is_other_department': False,
|
|
})
|
|
cross_department_total, cross_department_details, _ = request.website._search_with_fuzzy("jobs",
|
|
fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc",
|
|
options=cross_department_options)
|
|
cross_department_jobs = cross_department_details[1].get('results', Jobs)
|
|
else:
|
|
cross_department_total = total
|
|
cross_department_jobs = jobs
|
|
departments = sort(set(j.department_id or None for j in cross_department_jobs), 'name')
|
|
count_per_department = {'all': cross_department_total}
|
|
for d, jobs_list in groupby(cross_department_jobs, lambda job: job.department_id):
|
|
count_per_department[d] = len(jobs_list)
|
|
count_other_department = len(cross_department_jobs.filtered(lambda job: not job.department_id))
|
|
if count_other_department:
|
|
count_per_department[None] = count_other_department
|
|
|
|
# Offices
|
|
if office_id or is_remote:
|
|
cross_office_options = options.copy()
|
|
cross_office_options.update({
|
|
'allowFuzzy': False,
|
|
'office_id': None,
|
|
'is_remote': False,
|
|
})
|
|
cross_office_total, cross_office_details, _ = request.website._search_with_fuzzy("jobs",
|
|
fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc",
|
|
options=cross_office_options)
|
|
# Browse jobs as superuser, because address is restricted
|
|
cross_office_jobs = cross_office_details[1].get('results', Jobs).sudo()
|
|
else:
|
|
cross_office_total = total
|
|
cross_office_jobs = jobs
|
|
offices = sort(set(j.address_id or None for j in cross_office_jobs), 'city')
|
|
count_per_office = {'all': cross_office_total}
|
|
for o, jobs_list in groupby(cross_office_jobs, lambda job: job.address_id):
|
|
count_per_office[o] = len(jobs_list)
|
|
count_remote = len(cross_office_jobs.filtered(lambda job: not job.address_id))
|
|
if count_remote:
|
|
count_per_office[None] = count_remote
|
|
|
|
# Employment types
|
|
if contract_type_id or is_untyped:
|
|
cross_type_options = options.copy()
|
|
cross_type_options.update({
|
|
'allowFuzzy': False,
|
|
'contract_type_id': None,
|
|
'is_untyped': False,
|
|
})
|
|
cross_type_total, cross_type_details, _ = request.website._search_with_fuzzy("jobs",
|
|
fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc",
|
|
options=cross_type_options)
|
|
cross_type_jobs = cross_type_details[1].get('results', Jobs)
|
|
else:
|
|
cross_type_total = total
|
|
cross_type_jobs = jobs
|
|
employment_types = sort(set(j.contract_type_id for j in jobs if j.contract_type_id), 'name')
|
|
count_per_employment_type = {'all': cross_type_total}
|
|
for t, jobs_list in groupby(cross_type_jobs, lambda job: job.contract_type_id):
|
|
count_per_employment_type[t] = len(jobs_list)
|
|
count_untyped = len(cross_type_jobs.filtered(lambda job: not job.contract_type_id))
|
|
if count_untyped:
|
|
count_per_employment_type[None] = count_untyped
|
|
|
|
pager = request.website.pager(
|
|
url=request.httprequest.path.partition('/page/')[0],
|
|
url_args=request.httprequest.args,
|
|
total=total,
|
|
page=page,
|
|
step=self._jobs_per_page,
|
|
)
|
|
offset = pager['offset']
|
|
jobs = jobs[offset:offset + self._jobs_per_page]
|
|
|
|
office = env['res.partner'].browse(int(office_id)) if office_id else None
|
|
contract_type = env['hr.contract.type'].browse(int(contract_type_id)) if contract_type_id else None
|
|
# Render page
|
|
return request.render("website_hr_recruitment_extended.recruitment_index", {
|
|
'jobs': jobs,
|
|
'countries': countries,
|
|
'departments': departments,
|
|
'offices': offices,
|
|
'employment_types': employment_types,
|
|
'country_id': country,
|
|
'department_id': department,
|
|
'office_id': office,
|
|
'contract_type_id': contract_type,
|
|
'is_remote': is_remote,
|
|
'is_other_department': is_other_department,
|
|
'is_untyped': is_untyped,
|
|
'pager': pager,
|
|
'search': fuzzy_search_term or search,
|
|
'search_count': total,
|
|
'original_search': fuzzy_search_term and search,
|
|
'count_per_country': count_per_country,
|
|
'count_per_department': count_per_department,
|
|
'count_per_office': count_per_office,
|
|
'count_per_employment_type': count_per_employment_type,
|
|
})
|
|
|
|
@http.route('/jobs/add', type='json', auth="user", website=True)
|
|
def jobs_add(self, **kwargs):
|
|
# avoid branding of website_description by setting rendering_bundle in context
|
|
job = request.env['hr.job.recruitment'].with_context(rendering_bundle=True).create({
|
|
'name': _('Job Title'),
|
|
})
|
|
return f"/jobs/{request.env['ir.http']._slug(job)}"
|
|
|
|
@http.route('''/jobs/detail/<model("hr.job.recruitment"):job>''', type='http', auth="public", website=True, sitemap=True)
|
|
def jobs_detail(self, job, **kwargs):
|
|
redirect_url = f"/jobs/{request.env['ir.http']._slug(job)}"
|
|
return request.redirect(redirect_url, code=301)
|
|
|
|
@http.route('''/jobs/<model("hr.job.recruitment"):job>''', type='http', auth="public", website=True, sitemap=True)
|
|
def job(self, job, **kwargs):
|
|
return request.render("website_hr_recruitment_extended.recruitment_detail", {
|
|
'job': job,
|
|
'main_object': job,
|
|
})
|
|
|
|
@http.route('''/jobs/apply/<model("hr.job.recruitment"):job>''', type='http', auth="public", website=True, sitemap=True)
|
|
def jobs_apply(self, job, **kwargs):
|
|
error = {}
|
|
default = {}
|
|
if 'website_hr_recruitment_error' in request.session:
|
|
error = request.session.pop('website_hr_recruitment_error')
|
|
default = request.session.pop('website_hr_recruitment_default')
|
|
return request.render("website_hr_recruitment_extended.recruitment_apply", {
|
|
'job': job,
|
|
'error': error,
|
|
'default': default,
|
|
})
|
|
|
|
# Compatibility routes
|
|
|
|
@http.route([
|
|
'/jobs/country/<model("res.country"):country>',
|
|
'/jobs/department/<model("hr.department"):department>',
|
|
'/jobs/country/<model("res.country"):country>/department/<model("hr.department"):department>',
|
|
'/jobs/office/<int:office_id>',
|
|
'/jobs/country/<model("res.country"):country>/office/<int:office_id>',
|
|
'/jobs/department/<model("hr.department"):department>/office/<int:office_id>',
|
|
'/jobs/country/<model("res.country"):country>/department/<model("hr.department"):department>/office/<int:office_id>',
|
|
'/jobs/employment_type/<int:contract_type_id>',
|
|
'/jobs/country/<model("res.country"):country>/employment_type/<int:contract_type_id>',
|
|
'/jobs/department/<model("hr.department"):department>/employment_type/<int:contract_type_id>',
|
|
'/jobs/office/<int:office_id>/employment_type/<int:contract_type_id>',
|
|
'/jobs/country/<model("res.country"):country>/department/<model("hr.department"):department>/employment_type/<int:contract_type_id>',
|
|
'/jobs/country/<model("res.country"):country>/office/<int:office_id>/employment_type/<int:contract_type_id>',
|
|
'/jobs/department/<model("hr.department"):department>/office/<int:office_id>/employment_type/<int:contract_type_id>',
|
|
'/jobs/country/<model("res.country"):country>/department/<model("hr.department"):department>/office/<int:office_id>/employment_type/<int:contract_type_id>',
|
|
], type='http', auth="public", website=True, sitemap=False)
|
|
def jobs_compatibility(self, country=None, department=None, office_id=None, contract_type_id=None, **kwargs):
|
|
"""
|
|
Deprecated since Odoo 16.3: those routes are kept by compatibility.
|
|
They should not be used in Odoo code anymore.
|
|
"""
|
|
warnings.warn(
|
|
"This route is deprecated since Odoo 16.3: the jobs list is now available at /jobs or /jobs/page/XXX",
|
|
DeprecationWarning
|
|
)
|
|
url_params = {
|
|
'country_id': country and country.id,
|
|
'department_id': department and department.id,
|
|
'office_id': office_id,
|
|
'contract_type_id': contract_type_id,
|
|
**kwargs,
|
|
}
|
|
return request.redirect(
|
|
'/jobs?%s' % url_encode(url_params),
|
|
code=301,
|
|
)
|
|
|
|
|
|
@http.route('/hr_recruitment_extended/fetch_hr_recruitment_degree', type='json', auth="public", website=True)
|
|
def fetch_recruitment_degrees(self):
|
|
degrees = {}
|
|
all_degrees = http.request.env['hr.recruitment.degree'].sudo().search([])
|
|
if all_degrees:
|
|
for degree in all_degrees:
|
|
degrees[degree.id] = degree.name
|
|
return degrees
|
|
|
|
@http.route('/hr_recruitment_extended/fetch_preferred_locations', type='json', auth="public", website=True)
|
|
def fetch_preferred_locations(self, loc_ids):
|
|
locations = {}
|
|
for id in loc_ids:
|
|
location = http.request.env['hr.location'].sudo().browse(id)
|
|
if location:
|
|
locations[location.id] = location.location_name
|
|
return locations
|
|
|
|
@http.route('/hr_recruitment_extended/fetch_preferred_skills', type='json', auth="public", website=True)
|
|
def fetch_preferred_skills(self, skill_ids,fetch_others=False):
|
|
skills = {}
|
|
Skill = http.request.env['hr.skill'].sudo()
|
|
SkillLevel = http.request.env['hr.skill.level'].sudo()
|
|
|
|
if fetch_others:
|
|
for skill in Skill.search([('id','not in',skill_ids)]):
|
|
levels = SkillLevel.search([('skill_type_id', '=', skill.skill_type_id.id)],order='sequence')
|
|
skills[skill.id] = {
|
|
'id': skill.id,
|
|
'name': skill.name,
|
|
'levels': [{'id': lvl.id, 'name': lvl.name, 'percentage': lvl.level_progress, 'sequence': lvl.sequence} for lvl in levels]
|
|
}
|
|
else:
|
|
for skill in Skill.browse(skill_ids):
|
|
levels = SkillLevel.search([('skill_type_id', '=', skill.skill_type_id.id)],order='sequence')
|
|
skills[skill.id] = {
|
|
'id': skill.id,
|
|
'name': skill.name,
|
|
'levels': [{'id': lvl.id, 'name': lvl.name, 'percentage': lvl.level_progress, 'sequence': lvl.sequence} for lvl in levels]
|
|
}
|
|
|
|
return skills
|
|
|
|
@http.route('/website_hr_recruitment_extended/check_recent_application', type='json', auth="public", website=True)
|
|
def check_recent_application(self, value, job_id):
|
|
# Function to check if the applicant has an existing record based on email, phone, or linkedin
|
|
def refused_applicants_condition(applicant):
|
|
return not applicant.active \
|
|
and applicant.hr_job_recruitment.id == int(job_id) \
|
|
and applicant.create_date >= (datetime.now() - relativedelta(months=6))
|
|
# Search for applicants with the same email, phone, or linkedin (only if the value is not False/None)
|
|
applicants_with_similar_info = http.request.env['hr.applicant'].sudo().search([
|
|
('hr_job_recruitment','=',int(job_id)),
|
|
'|',
|
|
('email_normalized', '=', email_normalize(value)),
|
|
'|',
|
|
('partner_phone', '=', value),
|
|
('linkedin_profile', '=ilike', value),
|
|
], order='create_date DESC')
|
|
|
|
if not applicants_with_similar_info:
|
|
return {'message':None}
|
|
# Group applications by their status
|
|
applications_by_status = applicants_with_similar_info.grouped('application_status')
|
|
|
|
# Check for refused applicants with the same value within the last 6 months
|
|
refused_applicants = applications_by_status.get('refused', http.request.env['hr.applicant'])
|
|
if any(applicant for applicant in refused_applicants if refused_applicants_condition(applicant)):
|
|
return {
|
|
'message': _(
|
|
'We\'ve found a previous closed application in our system within the last 6 months.'
|
|
' Please consider before applying in order not to duplicate efforts.'
|
|
)
|
|
}
|
|
|
|
# Check for ongoing applications with the same value
|
|
ongoing_applications = applications_by_status.get('ongoing', [])
|
|
if ongoing_applications:
|
|
ongoing_application = ongoing_applications[0]
|
|
if ongoing_application.hr_job_recruitment.id == int(job_id):
|
|
recruiter_contact = "" if not ongoing_application.user_id else _(
|
|
' In case of issue, contact %(contact_infos)s',
|
|
contact_infos=", ".join(
|
|
[value for value in itemgetter('name', 'email', 'phone')(ongoing_application.user_id) if value]
|
|
))
|
|
|
|
error_message = 'An application already exists for %s Duplicates might be rejected. %s '%(value,recruiter_contact)
|
|
print(error_message)
|
|
return {
|
|
'message': _(error_message)
|
|
}
|
|
|
|
# If no existing application found, show the following message
|
|
return {
|
|
'message': _(
|
|
'We found a recent application with a similar name, email, phone number.'
|
|
' You can continue if it\'s not a mistake.'
|
|
)
|
|
}
|
|
|
|
def _should_log_authenticate_message(self, record):
|
|
if record._name == "hr.applicant" and not request.session.uid:
|
|
return False
|
|
return super()._should_log_authenticate_message(record)
|
|
|
|
def extract_data(self, model, values):
|
|
|
|
candidate = False
|
|
extracted_resume = values.pop('resume_base64', None)
|
|
current_ctc = values.pop('current_ctc', None)
|
|
expected_ctc = values.pop('expected_ctc', None)
|
|
available_joining_date = values.pop('available_joining_date', None)
|
|
exp_type = values.pop('exp_type', None)
|
|
current_location = values.pop('current_location', None)
|
|
preferred_locations_str = values.pop('preferred_locations', '')
|
|
department_id = values.pop('department_id',None)
|
|
hr_job_recruitment = values.pop('job_id', None)
|
|
preferred_locations = [int(x) for x in preferred_locations_str.split(',')] if len(
|
|
preferred_locations_str) > 0 else []
|
|
current_organization = values.pop('current_organization', None)
|
|
notice_period = values.pop('notice_period', 0)
|
|
notice_period_type = values.pop('notice_period_type', 'day')
|
|
experience_years = values.pop('experience_years',0)
|
|
experience_months = values.pop('experience_months',0)
|
|
|
|
# If there are months, convert everything to months
|
|
if int(experience_months) > 0:
|
|
total_experience = (int(experience_years) * 12) + int(experience_months)
|
|
total_experience_type = 'month'
|
|
else:
|
|
total_experience = int(experience_years)
|
|
total_experience_type = 'year'
|
|
|
|
skill_dict = {key: ast.literal_eval(value) for key,value in values.items() if "skill" in key and value != '0'}
|
|
|
|
if model.model == 'hr.applicant':
|
|
partner_name = values.pop('full_name', None)
|
|
partner_phone = values.pop('partner_phone', None)
|
|
alternate_phone = values.pop('alternate_phone', None)
|
|
partner_email = values.pop('email_from', None)
|
|
degree = values.pop('degree', None)
|
|
if partner_phone and partner_email:
|
|
candidate = request.env['hr.candidate'].sudo().search([
|
|
'|', ('email_from', '=', partner_email),
|
|
('partner_phone', '=', partner_phone),
|
|
], limit=1)
|
|
if candidate:
|
|
candidate.sudo().write({
|
|
'partner_name': partner_name,
|
|
'alternate_phone': alternate_phone,
|
|
'email_from': partner_email,
|
|
'partner_phone': partner_phone,
|
|
'type_id': int(degree) if degree.isdigit() else False,
|
|
'resume': extracted_resume
|
|
})
|
|
if not candidate:
|
|
candidate = request.env['hr.candidate'].sudo().create({
|
|
'partner_name': partner_name,
|
|
'email_from': partner_email,
|
|
'partner_phone': partner_phone,
|
|
'alternate_phone': alternate_phone,
|
|
'type_id': int(degree) if degree.isdigit() else False,
|
|
'resume': extracted_resume
|
|
})
|
|
|
|
if len(skill_dict) > 0:
|
|
# candidate_skills_list = []
|
|
for key, value in skill_dict.items():
|
|
candidate_skills = dict()
|
|
skill_type_id = request.env['hr.skill'].sudo().browse(int(key.split("_")[1])).skill_type_id.id
|
|
candidate_skills['candidate_id'] = candidate.id
|
|
candidate_skills['skill_id'] = int(key.split("_")[1])
|
|
candidate_skills['skill_level_id'] = value[0]
|
|
candidate_skills['level_progress'] = value[1]
|
|
candidate_skills['skill_type_id'] = skill_type_id
|
|
# skill = request.env['hr.candidate.skill'].sudo().create(candidate_skills)
|
|
# candidate_skills_list.append(skill.id)
|
|
if candidate.candidate_skill_ids and candidate_skills['skill_id'] not in candidate.candidate_skill_ids.skill_id.ids:
|
|
candidate.write({'candidate_skill_ids':[(0,4,candidate_skills)]})
|
|
else:
|
|
candidate.write({'candidate_skill_ids':[(0,4,candidate_skills)]})
|
|
|
|
|
|
else:
|
|
skills = None
|
|
|
|
values['partner_name'] = partner_name
|
|
if partner_phone:
|
|
values['partner_phone'] = partner_phone
|
|
if partner_email:
|
|
values['email_from'] = partner_email
|
|
data = super().extract_data(model, values)
|
|
data['record']['current_ctc'] = float(current_ctc if current_ctc else 0)
|
|
data['record']['salary_expected'] = float(expected_ctc if expected_ctc else 0)
|
|
data['record']['exp_type'] = exp_type if exp_type else 'fresher'
|
|
data['record']['current_location'] = current_location if current_location else ''
|
|
data['record']['current_organization'] = current_organization if current_organization else ''
|
|
data['record']['notice_period'] = notice_period if notice_period else 0
|
|
data['record']['notice_period_type'] = notice_period_type if notice_period_type else 'day'
|
|
data['record']['hr_job_recruitment'] = int(hr_job_recruitment) if str(hr_job_recruitment).isdigit() else ''
|
|
data['record']['department_id'] = int(department_id) if str(department_id).isdigit() else ''
|
|
data['record']['availability'] = datetime.strptime(available_joining_date, '%Y-%m-%d').date() if available_joining_date else ''
|
|
|
|
data['record']['total_exp'] = total_experience if total_experience else 0
|
|
data['record']['total_exp_type'] = total_experience_type if total_experience_type else 'year'
|
|
# data['record']['resume'] = resume if resume else None
|
|
if len(preferred_locations_str) > 0:
|
|
data['record']['preferred_location'] = preferred_locations
|
|
if candidate:
|
|
data['record']['candidate_id'] = candidate.id
|
|
data['record']['type_id'] = candidate.type_id.id
|
|
|
|
return data
|
|
|