# # Part of Odoo. See LICENSE file for full copyright and licensing details. # # import warnings # from datetime import datetime # from dateutil.relativedelta import relativedelta # from operator import itemgetter # from werkzeug.urls import url_encode # # from odoo import http, _ # from odoo.addons.website_hr_recruitment.controllers.main import WebsiteHrRecruitment # from odoo.osv.expression import AND # from odoo.http import request # from odoo.tools import email_normalize # from odoo.tools.misc import groupby # import ast # import base64 # # # # class WebsiteJobHrRecruitment(WebsiteHrRecruitment): # _jobs_per_page = 12 # # def sitemap_jobs(env, rule, qs): # if not qs or qs.lower() in '/jobs': # yield {'loc': '/jobs'} # # @http.route([ # '/jobs', # '/jobs/page/', # ], type='http', auth="public", website=True, sitemap=sitemap_jobs) # def jobs(self, country_id=None, department_id=None, office_id=None, contract_type_id=None, # is_remote=False, is_other_department=False, is_untyped=None, page=1, search=None, **kwargs): # env = request.env(context=dict(request.env.context, show_address=True, no_tag_br=True)) # # Country = env['res.country'] # Jobs = env['hr.job.recruitment'] # Department = env['hr.department'] # # country = Country.browse(int(country_id)) if country_id else None # department = Department.browse(int(department_id)) if department_id else None # office_id = int(office_id) if office_id else None # contract_type_id = int(contract_type_id) if contract_type_id else None # # # Default search by user country # if not (country or department or office_id or contract_type_id or kwargs.get('all_countries')): # if request.geoip.country_code: # countries_ = Country.search([('code', '=', request.geoip.country_code)]) # country = countries_[0] if countries_ else None # if country: # country_count = Jobs.search_count(AND([ # request.website.website_domain(), # [('address_id.country_id', '=', country.id)] # ])) # if not country_count: # country = False # # options = { # 'displayDescription': True, # 'allowFuzzy': not request.params.get('noFuzzy'), # 'country_id': country.id if country else None, # 'department_id': department.id if department else None, # 'office_id': office_id, # 'contract_type_id': contract_type_id, # 'is_remote': is_remote, # 'is_other_department': is_other_department, # 'is_untyped': is_untyped, # } # total, details, fuzzy_search_term = request.website._search_with_fuzzy("job_requests", search, # limit=1000, order="is_published desc, sequence, no_of_recruitment desc", options=options) # # Browse jobs as superuser, because address is restricted # jobs = details[0].get('results', Jobs).sudo() # # def sort(records_list, field_name): # """ Sort records in the given collection according to the given # field name, alphabetically. None values instead of records are # placed at the end. # # :param list records_list: collection of records or None values # :param str field_name: field on which to sort # :return: sorted list # """ # return sorted( # records_list, # key=lambda item: (item is None, item.sudo()[field_name] if item and item.sudo()[field_name] else ''), # ) # # # Countries # if country or is_remote: # cross_country_options = options.copy() # cross_country_options.update({ # 'allowFuzzy': False, # 'country_id': None, # 'is_remote': False, # }) # cross_country_total, cross_country_details, _ = request.website._search_with_fuzzy("jobs", # fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", # options=cross_country_options) # # Browse jobs as superuser, because address is restricted # cross_country_jobs = cross_country_details[1].get('results', Jobs).sudo() # else: # cross_country_total = total # cross_country_jobs = jobs # country_offices = set(j.address_id or None for j in cross_country_jobs) # countries = sort(set(o and o.country_id or None for o in country_offices), 'name') # count_per_country = {'all': cross_country_total} # for c, jobs_list in groupby(cross_country_jobs, lambda job: job.address_id.country_id): # count_per_country[c] = len(jobs_list) # count_remote = len(cross_country_jobs.filtered(lambda job: not job.address_id)) # if count_remote: # count_per_country[None] = count_remote # # # Departments # if department or is_other_department: # cross_department_options = options.copy() # cross_department_options.update({ # 'allowFuzzy': False, # 'department_id': None, # 'is_other_department': False, # }) # cross_department_total, cross_department_details, _ = request.website._search_with_fuzzy("jobs", # fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", # options=cross_department_options) # cross_department_jobs = cross_department_details[1].get('results', Jobs) # else: # cross_department_total = total # cross_department_jobs = jobs # departments = sort(set(j.department_id or None for j in cross_department_jobs), 'name') # count_per_department = {'all': cross_department_total} # for d, jobs_list in groupby(cross_department_jobs, lambda job: job.department_id): # count_per_department[d] = len(jobs_list) # count_other_department = len(cross_department_jobs.filtered(lambda job: not job.department_id)) # if count_other_department: # count_per_department[None] = count_other_department # # # Offices # if office_id or is_remote: # cross_office_options = options.copy() # cross_office_options.update({ # 'allowFuzzy': False, # 'office_id': None, # 'is_remote': False, # }) # cross_office_total, cross_office_details, _ = request.website._search_with_fuzzy("jobs", # fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", # options=cross_office_options) # # Browse jobs as superuser, because address is restricted # cross_office_jobs = cross_office_details[1].get('results', Jobs).sudo() # else: # cross_office_total = total # cross_office_jobs = jobs # offices = sort(set(j.address_id or None for j in cross_office_jobs), 'city') # count_per_office = {'all': cross_office_total} # for o, jobs_list in groupby(cross_office_jobs, lambda job: job.address_id): # count_per_office[o] = len(jobs_list) # count_remote = len(cross_office_jobs.filtered(lambda job: not job.address_id)) # if count_remote: # count_per_office[None] = count_remote # # # Employment types # if contract_type_id or is_untyped: # cross_type_options = options.copy() # cross_type_options.update({ # 'allowFuzzy': False, # 'contract_type_id': None, # 'is_untyped': False, # }) # cross_type_total, cross_type_details, _ = request.website._search_with_fuzzy("jobs", # fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", # options=cross_type_options) # cross_type_jobs = cross_type_details[1].get('results', Jobs) # else: # cross_type_total = total # cross_type_jobs = jobs # employment_types = sort(set(j.contract_type_id for j in jobs if j.contract_type_id), 'name') # count_per_employment_type = {'all': cross_type_total} # for t, jobs_list in groupby(cross_type_jobs, lambda job: job.contract_type_id): # count_per_employment_type[t] = len(jobs_list) # count_untyped = len(cross_type_jobs.filtered(lambda job: not job.contract_type_id)) # if count_untyped: # count_per_employment_type[None] = count_untyped # # pager = request.website.pager( # url=request.httprequest.path.partition('/page/')[0], # url_args=request.httprequest.args, # total=total, # page=page, # step=self._jobs_per_page, # ) # offset = pager['offset'] # jobs = jobs[offset:offset + self._jobs_per_page] # # office = env['res.partner'].browse(int(office_id)) if office_id else None # contract_type = env['hr.contract.type'].browse(int(contract_type_id)) if contract_type_id else None # # Render page # return request.render("website_hr_recruitment_extended.recruitment_index", { # 'jobs': jobs, # 'countries': countries, # 'departments': departments, # 'offices': offices, # 'employment_types': employment_types, # 'country_id': country, # 'department_id': department, # 'office_id': office, # 'contract_type_id': contract_type, # 'is_remote': is_remote, # 'is_other_department': is_other_department, # 'is_untyped': is_untyped, # 'pager': pager, # 'search': fuzzy_search_term or search, # 'search_count': total, # 'original_search': fuzzy_search_term and search, # 'count_per_country': count_per_country, # 'count_per_department': count_per_department, # 'count_per_office': count_per_office, # 'count_per_employment_type': count_per_employment_type, # }) # # @http.route('/jobs/add', type='json', auth="user", website=True) # def jobs_add(self, **kwargs): # # avoid branding of website_description by setting rendering_bundle in context # job = request.env['hr.job.recruitment'].with_context(rendering_bundle=True).create({ # 'name': _('Job Title'), # }) # return f"/jobs/{request.env['ir.http']._slug(job)}" # # @http.route('''/jobs/detail/''', type='http', auth="public", website=True, sitemap=True) # def jobs_detail(self, job, **kwargs): # redirect_url = f"/jobs/{request.env['ir.http']._slug(job)}" # return request.redirect(redirect_url, code=301) # # @http.route('''/jobs/''', type='http', auth="public", website=True, sitemap=True) # def job(self, job, **kwargs): # return request.render("website_hr_recruitment_extended.recruitment_detail", { # 'job': job, # 'main_object': job, # }) # # @http.route('''/jobs/apply/''', type='http', auth="public", website=True, sitemap=True) # def jobs_apply(self, job, **kwargs): # error = {} # default = {} # if 'website_hr_recruitment_error' in request.session: # error = request.session.pop('website_hr_recruitment_error') # default = request.session.pop('website_hr_recruitment_default') # return request.render("website_hr_recruitment_extended.recruitment_apply", { # 'job': job, # 'error': error, # 'default': default, # }) # # # Compatibility routes # # @http.route([ # '/jobs/country/', # '/jobs/department/', # '/jobs/country//department/', # '/jobs/office/', # '/jobs/country//office/', # '/jobs/department//office/', # '/jobs/country//department//office/', # '/jobs/employment_type/', # '/jobs/country//employment_type/', # '/jobs/department//employment_type/', # '/jobs/office//employment_type/', # '/jobs/country//department//employment_type/', # '/jobs/country//office//employment_type/', # '/jobs/department//office//employment_type/', # '/jobs/country//department//office//employment_type/', # ], type='http', auth="public", website=True, sitemap=False) # def jobs_compatibility(self, country=None, department=None, office_id=None, contract_type_id=None, **kwargs): # """ # Deprecated since Odoo 16.3: those routes are kept by compatibility. # They should not be used in Odoo code anymore. # """ # warnings.warn( # "This route is deprecated since Odoo 16.3: the jobs list is now available at /jobs or /jobs/page/XXX", # DeprecationWarning # ) # url_params = { # 'country_id': country and country.id, # 'department_id': department and department.id, # 'office_id': office_id, # 'contract_type_id': contract_type_id, # **kwargs, # } # return request.redirect( # '/jobs?%s' % url_encode(url_params), # code=301, # ) # # # @http.route('/hr_recruitment_extended/fetch_hr_recruitment_degree', type='json', auth="public", website=True) # def fetch_recruitment_degrees(self): # degrees = {} # all_degrees = http.request.env['hr.recruitment.degree'].sudo().search([]) # if all_degrees: # for degree in all_degrees: # degrees[degree.id] = degree.name # return degrees # # @http.route('/hr_recruitment_extended/fetch_preferred_locations', type='json', auth="public", website=True) # def fetch_preferred_locations(self, loc_ids): # locations = {} # for id in loc_ids: # location = http.request.env['hr.location'].sudo().browse(id) # if location: # locations[location.id] = location.location_name # return locations # # @http.route('/hr_recruitment_extended/fetch_preferred_skills', type='json', auth="public", website=True) # def fetch_preferred_skills(self, skill_ids,fetch_others=False): # skills = {} # Skill = http.request.env['hr.skill'].sudo() # SkillLevel = http.request.env['hr.skill.level'].sudo() # # if fetch_others: # for skill in Skill.search([('id','not in',skill_ids)]): # levels = SkillLevel.search([('skill_type_id', '=', skill.skill_type_id.id)],order='sequence') # skills[skill.id] = { # 'id': skill.id, # 'name': skill.name, # 'levels': [{'id': lvl.id, 'name': lvl.name, 'percentage': lvl.level_progress, 'sequence': lvl.sequence} for lvl in levels] # } # else: # for skill in Skill.browse(skill_ids): # levels = SkillLevel.search([('skill_type_id', '=', skill.skill_type_id.id)],order='sequence') # skills[skill.id] = { # 'id': skill.id, # 'name': skill.name, # 'levels': [{'id': lvl.id, 'name': lvl.name, 'percentage': lvl.level_progress, 'sequence': lvl.sequence} for lvl in levels] # } # # return skills # # @http.route('/website_hr_recruitment_extended/check_recent_application', type='json', auth="public", website=True) # def check_recent_application(self, value, job_id): # # Function to check if the applicant has an existing record based on email, phone, or linkedin # if value: # def refused_applicants_condition(applicant): # return not applicant.active \ # and applicant.hr_job_recruitment.id == int(job_id) \ # and applicant.create_date >= (datetime.now() - relativedelta(months=6)) # # Search for applicants with the same email, phone, or linkedin (only if the value is not False/None) # applicants_with_similar_info = http.request.env['hr.applicant'].sudo().search([ # ('hr_job_recruitment','=',int(job_id)), # '|', # ('email_normalized', '=', email_normalize(value)), # '|', # ('partner_phone', '=', value), # ('linkedin_profile', '=ilike', value), # ], order='create_date DESC') # # if not applicants_with_similar_info: # return {'message':None} # # Group applications by their status # applications_by_status = applicants_with_similar_info.grouped('application_status') # # # Check for refused applicants with the same value within the last 6 months # refused_applicants = applications_by_status.get('refused', http.request.env['hr.applicant']) # if any(applicant for applicant in refused_applicants if refused_applicants_condition(applicant)): # return { # 'message': _( # 'We\'ve found a previous closed application in our system within the last 6 months.' # ' Please consider before applying in order not to duplicate efforts.' # ) # } # # # Check for ongoing applications with the same value # ongoing_applications = applications_by_status.get('ongoing', []) # if ongoing_applications: # ongoing_application = ongoing_applications[0] # if ongoing_application.hr_job_recruitment.id == int(job_id): # recruiter_contact = "" if not ongoing_application.user_id else _( # ' In case of issue, contact %(contact_infos)s', # contact_infos=", ".join( # [value for value in itemgetter('name', 'email', 'phone')(ongoing_application.user_id) if value] # )) # # error_message = 'An application already exists for %s Duplicates might be rejected. %s '%(value,recruiter_contact) # print(error_message) # return { # 'message': _(error_message) # } # # # If no existing application found, show the following message # return { # 'message': _( # 'We found a recent application with a similar name, email, phone number.' # ' You can continue if it\'s not a mistake.' # ) # } # else: # return {'message': None} # # def _should_log_authenticate_message(self, record): # if record._name == "hr.applicant" and not request.session.uid: # return False # return super()._should_log_authenticate_message(record) # # def extract_data(self, model, values): # candidate = False # extracted_resume = values.pop('resume_base64', None) # current_ctc = values.pop('current_ctc', None) # expected_ctc = values.pop('expected_ctc', None) # available_joining_date = values.pop('available_joining_date', None) # exp_type = values.pop('exp_type', None) # current_location = values.pop('current_location', None) # preferred_locations_str = values.pop('preferred_locations', '') # department_id = values.pop('department_id',None) # hr_job_recruitment = values.pop('job_id', None) # preferred_locations = [int(x) for x in preferred_locations_str.split(',')] if len( # preferred_locations_str) > 0 else [] # current_organization = values.pop('current_organization', None) # notice_period = values.pop('notice_period', 0) # notice_period_type = values.pop('notice_period_type', 'day') # experience_years = values.pop('experience_years',0) # experience_months = values.pop('experience_months',0) # # # If there are months, convert everything to months # if int(experience_months) > 0: # total_experience = (int(experience_years) * 12) + int(experience_months) # total_experience_type = 'month' # else: # total_experience = int(experience_years) # total_experience_type = 'year' # # if extracted_resume: # attachment = request.env.ref("hr_recruitment_extended.employee_recruitment_attachments_preview") # file = attachment.sudo().write({ # 'datas': extracted_resume, # }) # if file: # resume_type = attachment.mimetype # resume_name = attachment.name # else: # resume_type = '' # resume_name = '' # else: # resume_type = '' # resume_name = '' # # skill_dict = {key: ast.literal_eval(value) for key,value in values.items() if "skill" in key and value != '0'} # # if model.model == 'hr.applicant': # partner_name = values.pop('full_name', None) # partner_phone = values.pop('partner_phone', None) # alternate_phone = values.pop('alternate_phone', None) # partner_email = values.pop('email_from', None) # degree = values.pop('degree', None) # if partner_phone and partner_email: # candidate = request.env['hr.candidate'].sudo().search([ # '|', ('email_from', '=', partner_email), # ('partner_phone', '=', partner_phone), # ], limit=1) # # if candidate: # candidate.sudo().write({ # 'partner_name': partner_name, # 'alternate_phone': alternate_phone, # 'email_from': partner_email, # 'partner_phone': partner_phone, # 'type_id': int(degree) if degree.isdigit() else False, # 'resume': extracted_resume, # 'resume_type': resume_type, # 'resume_name': resume_name, # }) # if not candidate: # candidate = request.env['hr.candidate'].sudo().create({ # 'partner_name': partner_name, # 'email_from': partner_email, # 'partner_phone': partner_phone, # 'alternate_phone': alternate_phone, # 'type_id': int(degree) if degree.isdigit() else False, # 'resume': extracted_resume, # 'resume_type': resume_type, # 'resume_name': resume_name, # }) # # if len(skill_dict) > 0: # # candidate_skills_list = [] # for key, value in skill_dict.items(): # candidate_skills = dict() # skill_type_id = request.env['hr.skill'].sudo().browse(int(key.split("_")[1])).skill_type_id.id # candidate_skills['candidate_id'] = candidate.id # candidate_skills['skill_id'] = int(key.split("_")[1]) # candidate_skills['skill_level_id'] = value[0] # candidate_skills['level_progress'] = value[1] # candidate_skills['skill_type_id'] = skill_type_id # # skill = request.env['hr.candidate.skill'].sudo().create(candidate_skills) # # candidate_skills_list.append(skill.id) # if candidate.candidate_skill_ids: # if candidate_skills['skill_id'] not in candidate.candidate_skill_ids.skill_id.ids: # candidate.write({'candidate_skill_ids':[(0,4,candidate_skills)]}) # else: # candidate.write({'candidate_skill_ids':[(0,4,candidate_skills)]}) # # # else: # skills = None # # values['partner_name'] = partner_name # if partner_phone: # values['partner_phone'] = partner_phone # if partner_email: # values['email_from'] = partner_email # notice_period_str = 'N/A' # if notice_period and notice_period_type: # notice_period_str = str(notice_period) + ' ' + str(notice_period_type) # data = super().extract_data(model, values) # data['record']['current_ctc'] = float(current_ctc if current_ctc else 0) # data['record']['salary_expected'] = float(expected_ctc if expected_ctc else 0) # data['record']['exp_type'] = exp_type if exp_type else 'fresher' # data['record']['current_location'] = current_location if current_location else '' # data['record']['current_organization'] = current_organization if current_organization else '' # data['record']['notice_period'] = notice_period_str if notice_period_str else 'N/A' # data['record']['notice_period_type'] = notice_period_type if notice_period_type else 'day' # data['record']['hr_job_recruitment'] = int(hr_job_recruitment) if str(hr_job_recruitment).isdigit() else '' # data['record']['department_id'] = int(department_id) if str(department_id).isdigit() else '' # data['record']['availability'] = datetime.strptime(available_joining_date, '%Y-%m-%d').date() if available_joining_date else '' # # data['record']['total_exp'] = total_experience if total_experience else 0 # data['record']['total_exp_type'] = total_experience_type if total_experience_type else 'year' # # data['record']['resume'] = resume if resume else None # if len(preferred_locations_str) > 0: # data['record']['preferred_location'] = preferred_locations # if candidate: # data['record']['candidate_id'] = candidate.id # data['record']['type_id'] = candidate.type_id.id # # return data # # Part of Odoo. See LICENSE file for full copyright and licensing details. import warnings from datetime import datetime from dateutil.relativedelta import relativedelta from operator import itemgetter from werkzeug.urls import url_encode from odoo import http, _ from odoo.addons.website_hr_recruitment.controllers.main import WebsiteHrRecruitment from odoo.osv.expression import AND from odoo.http import request from odoo.tools import email_normalize from odoo.tools.misc import groupby import ast import base64 class WebsiteJobHrRecruitment(WebsiteHrRecruitment): _jobs_per_page = 12 def sitemap_jobs(env, rule, qs): if not qs or qs.lower() in '/jobs': yield {'loc': '/jobs'} @http.route([ '/jobs', '/jobs/page/', ], type='http', auth="public", website=True, sitemap=sitemap_jobs) def jobs(self, country_id=None, department_id=None, office_id=None, contract_type_id=None, is_remote=False, is_other_department=False, is_untyped=None, page=1, search=None, **kwargs): env = request.env(context=dict(request.env.context, show_address=True, no_tag_br=True)) Country = env['res.country'] Jobs = env['hr.job.recruitment'] Department = env['hr.department'] country = Country.browse(int(country_id)) if country_id else None department = Department.browse(int(department_id)) if department_id else None office_id = int(office_id) if office_id else None contract_type_id = int(contract_type_id) if contract_type_id else None # Default search by user country if not (country or department or office_id or contract_type_id or kwargs.get('all_countries')): if request.geoip.country_code: countries_ = Country.search([('code', '=', request.geoip.country_code)]) country = countries_[0] if countries_ else None if country: country_count = Jobs.search_count(AND([ request.website.website_domain(), [('address_id.country_id', '=', country.id)] ])) if not country_count: country = False options = { 'displayDescription': True, 'allowFuzzy': not request.params.get('noFuzzy'), 'country_id': country.id if country else None, 'department_id': department.id if department else None, 'office_id': office_id, 'contract_type_id': contract_type_id, 'is_remote': is_remote, 'is_other_department': is_other_department, 'is_untyped': is_untyped, } total, details, fuzzy_search_term = request.website._search_with_fuzzy("job_requests", search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", options=options) # Browse jobs as superuser, because address is restricted jobs = details[0].get('results', Jobs).sudo() if search: search = search.strip() custom_jobs = Jobs.sudo().search([ '|', '|', '|', ('name', 'ilike', search), ('address_id.city', 'ilike', search), ('skill_ids.name', 'ilike', search), ('secondary_skill_ids.name', 'ilike', search), ]) # Merge fuzzy jobs + custom jobs jobs = (jobs | custom_jobs).sorted( key=lambda j: ( not j.is_published, j.sequence, -j.no_of_recruitment ) ) total = len(jobs) else: jobs = jobs def sort(records_list, field_name): """ Sort records in the given collection according to the given field name, alphabetically. None values instead of records are placed at the end. :param list records_list: collection of records or None values :param str field_name: field on which to sort :return: sorted list """ return sorted( records_list, key=lambda item: (item is None, item.sudo()[field_name] if item and item.sudo()[field_name] else ''), ) # Countries if country or is_remote: cross_country_options = options.copy() cross_country_options.update({ 'allowFuzzy': False, 'country_id': None, 'is_remote': False, }) cross_country_total, cross_country_details, _ = request.website._search_with_fuzzy("jobs", fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", options=cross_country_options) # Browse jobs as superuser, because address is restricted cross_country_jobs = cross_country_details[1].get('results', Jobs).sudo() else: cross_country_total = total cross_country_jobs = jobs country_offices = set(j.address_id or None for j in cross_country_jobs) countries = sort(set(o and o.country_id or None for o in country_offices), 'name') count_per_country = {'all': cross_country_total} for c, jobs_list in groupby(cross_country_jobs, lambda job: job.address_id.country_id): count_per_country[c] = len(jobs_list) count_remote = len(cross_country_jobs.filtered(lambda job: not job.address_id)) if count_remote: count_per_country[None] = count_remote # Departments if department or is_other_department: cross_department_options = options.copy() cross_department_options.update({ 'allowFuzzy': False, 'department_id': None, 'is_other_department': False, }) cross_department_total, cross_department_details, _ = request.website._search_with_fuzzy("jobs", fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", options=cross_department_options) cross_department_jobs = cross_department_details[1].get('results', Jobs) else: cross_department_total = total cross_department_jobs = jobs departments = sort(set(j.department_id or None for j in cross_department_jobs), 'name') count_per_department = {'all': cross_department_total} for d, jobs_list in groupby(cross_department_jobs, lambda job: job.department_id): count_per_department[d] = len(jobs_list) count_other_department = len(cross_department_jobs.filtered(lambda job: not job.department_id)) if count_other_department: count_per_department[None] = count_other_department # Offices if office_id or is_remote: cross_office_options = options.copy() cross_office_options.update({ 'allowFuzzy': False, 'office_id': None, 'is_remote': False, }) cross_office_total, cross_office_details, _ = request.website._search_with_fuzzy("jobs", fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", options=cross_office_options) # Browse jobs as superuser, because address is restricted cross_office_jobs = cross_office_details[1].get('results', Jobs).sudo() else: cross_office_total = total cross_office_jobs = jobs offices = sort(set(j.address_id or None for j in cross_office_jobs), 'city') count_per_office = {'all': cross_office_total} for o, jobs_list in groupby(cross_office_jobs, lambda job: job.address_id): count_per_office[o] = len(jobs_list) count_remote = len(cross_office_jobs.filtered(lambda job: not job.address_id)) if count_remote: count_per_office[None] = count_remote # Employment types if contract_type_id or is_untyped: cross_type_options = options.copy() cross_type_options.update({ 'allowFuzzy': False, 'contract_type_id': None, 'is_untyped': False, }) cross_type_total, cross_type_details, _ = request.website._search_with_fuzzy("jobs", fuzzy_search_term or search, limit=1000, order="is_published desc, sequence, no_of_recruitment desc", options=cross_type_options) cross_type_jobs = cross_type_details[1].get('results', Jobs) else: cross_type_total = total cross_type_jobs = jobs employment_types = sort(set(j.contract_type_id for j in jobs if j.contract_type_id), 'name') count_per_employment_type = {'all': cross_type_total} for t, jobs_list in groupby(cross_type_jobs, lambda job: job.contract_type_id): count_per_employment_type[t] = len(jobs_list) count_untyped = len(cross_type_jobs.filtered(lambda job: not job.contract_type_id)) if count_untyped: count_per_employment_type[None] = count_untyped pager = request.website.pager( url=request.httprequest.path.partition('/page/')[0], url_args=request.httprequest.args, total=total, page=page, step=self._jobs_per_page, ) offset = pager['offset'] jobs = jobs[offset:offset + self._jobs_per_page] office = env['res.partner'].browse(int(office_id)) if office_id else None contract_type = env['hr.contract.type'].browse(int(contract_type_id)) if contract_type_id else None # Render page return request.render("website_hr_recruitment_extended.recruitment_index", { 'jobs': jobs, 'countries': countries, 'departments': departments, 'offices': offices, 'employment_types': employment_types, 'country_id': country, 'department_id': department, 'office_id': office, 'contract_type_id': contract_type, 'is_remote': is_remote, 'is_other_department': is_other_department, 'is_untyped': is_untyped, 'pager': pager, 'search': fuzzy_search_term or search, 'search_count': total, 'original_search': fuzzy_search_term and search, 'count_per_country': count_per_country, 'count_per_department': count_per_department, 'count_per_office': count_per_office, 'count_per_employment_type': count_per_employment_type, }) @http.route('/jobs/add', type='json', auth="user", website=True) def jobs_add(self, **kwargs): # avoid branding of website_description by setting rendering_bundle in context job = request.env['hr.job.recruitment'].with_context(rendering_bundle=True).create({ 'name': _('Job Title'), }) return f"/jobs/{request.env['ir.http']._slug(job)}" @http.route('''/jobs/detail/''', type='http', auth="public", website=True, sitemap=True) def jobs_detail(self, job, **kwargs): redirect_url = f"/jobs/{request.env['ir.http']._slug(job)}" return request.redirect(redirect_url, code=301) @http.route('''/jobs/''', type='http', auth="public", website=True, sitemap=True) def job(self, job, **kwargs): return request.render("website_hr_recruitment_extended.recruitment_detail", { 'job': job, 'main_object': job, }) @http.route('''/jobs/apply/''', type='http', auth="public", website=True, sitemap=True) def jobs_apply(self, job, **kwargs): error = {} default = {} if 'website_hr_recruitment_error' in request.session: error = request.session.pop('website_hr_recruitment_error') default = request.session.pop('website_hr_recruitment_default') return request.render("website_hr_recruitment_extended.recruitment_apply", { 'job': job, 'error': error, 'default': default, }) # Compatibility routes @http.route([ '/jobs/country/', '/jobs/department/', '/jobs/country//department/', '/jobs/office/', '/jobs/country//office/', '/jobs/department//office/', '/jobs/country//department//office/', '/jobs/employment_type/', '/jobs/country//employment_type/', '/jobs/department//employment_type/', '/jobs/office//employment_type/', '/jobs/country//department//employment_type/', '/jobs/country//office//employment_type/', '/jobs/department//office//employment_type/', '/jobs/country//department//office//employment_type/', ], type='http', auth="public", website=True, sitemap=False) def jobs_compatibility(self, country=None, department=None, office_id=None, contract_type_id=None, **kwargs): """ Deprecated since Odoo 16.3: those routes are kept by compatibility. They should not be used in Odoo code anymore. """ warnings.warn( "This route is deprecated since Odoo 16.3: the jobs list is now available at /jobs or /jobs/page/XXX", DeprecationWarning ) url_params = { 'country_id': country and country.id, 'department_id': department and department.id, 'office_id': office_id, 'contract_type_id': contract_type_id, **kwargs, } return request.redirect( '/jobs?%s' % url_encode(url_params), code=301, ) @http.route('/hr_recruitment_extended/fetch_hr_recruitment_degree', type='json', auth="public", website=True) def fetch_recruitment_degrees(self): degrees = {} all_degrees = http.request.env['hr.recruitment.degree'].sudo().search([]) if all_degrees: for degree in all_degrees: degrees[degree.id] = degree.name return degrees @http.route('/hr_recruitment_extended/fetch_preferred_locations', type='json', auth="public", website=True) def fetch_preferred_locations(self, loc_ids): locations = {} for id in loc_ids: location = http.request.env['hr.location'].sudo().browse(id) if location: locations[location.id] = location.location_name return locations # @http.route('/hr_recruitment_extended/fetch_preferred_skills', type='json', auth="public", website=True) # def fetch_preferred_skills(self, skill_ids,fetch_others=False): # skills = {} # Skill = http.request.env['hr.skill'].sudo() # SkillLevel = http.request.env['hr.skill.level'].sudo() # # if fetch_others: # for skill in Skill.search([('id','not in',skill_ids)]): # levels = SkillLevel.search([('skill_type_id', '=', skill.skill_type_id.id)],order='sequence') # skills[skill.id] = { # 'id': skill.id, # 'name': skill.name, # 'levels': [{'id': lvl.id, 'name': lvl.name, 'percentage': lvl.level_progress, 'sequence': lvl.sequence} for lvl in levels] # } # else: # for skill in Skill.browse(skill_ids): # levels = SkillLevel.search([('skill_type_id', '=', skill.skill_type_id.id)],order='sequence') # skills[skill.id] = { # 'id': skill.id, # 'name': skill.name, # 'levels': [{'id': lvl.id, 'name': lvl.name, 'percentage': lvl.level_progress, 'sequence': lvl.sequence} for lvl in levels] # } # # return skills @http.route('/hr_recruitment_extended/fetch_preferred_skills',type='json',auth="public",website=True) def fetch_preferred_skills(self, skill_ids, fetch_others=False): skills = {} Skill = http.request.env['hr.skill'].sudo() domain = [] if fetch_others: domain = [('id', 'not in', skill_ids)] else: domain = [('id', 'in', skill_ids)] for skill in Skill.search(domain): skills[skill.id] = { 'id': skill.id, 'name': skill.name, } return skills @http.route('/website_hr_recruitment_extended/check_recent_application', type='json', auth="public", website=True) def check_recent_application(self, value, job_id): # Function to check if the applicant has an existing record based on email, phone, or linkedin if value: def refused_applicants_condition(applicant): return not applicant.active \ and applicant.hr_job_recruitment.id == int(job_id) \ and applicant.create_date >= (datetime.now() - relativedelta(months=6)) # Search for applicants with the same email, phone, or linkedin (only if the value is not False/None) applicants_with_similar_info = http.request.env['hr.applicant'].sudo().search([ ('hr_job_recruitment','=',int(job_id)), '|', ('email_normalized', '=', email_normalize(value)), '|', ('partner_phone', '=', value), ('linkedin_profile', '=ilike', value), ], order='create_date DESC') if not applicants_with_similar_info: return {'message':None} # Group applications by their status applications_by_status = applicants_with_similar_info.grouped('application_status') # Check for refused applicants with the same value within the last 6 months refused_applicants = applications_by_status.get('refused', http.request.env['hr.applicant']) if any(applicant for applicant in refused_applicants if refused_applicants_condition(applicant)): return { 'message': _( 'We\'ve found a previous closed application in our system within the last 6 months.' ' Please consider before applying in order not to duplicate efforts.' ) } # Check for ongoing applications with the same value ongoing_applications = applications_by_status.get('ongoing', []) if ongoing_applications: ongoing_application = ongoing_applications[0] if ongoing_application.hr_job_recruitment.id == int(job_id): recruiter_contact = "" if not ongoing_application.user_id else _( ' In case of issue, contact %(contact_infos)s', contact_infos=", ".join( [value for value in itemgetter('name', 'email', 'phone')(ongoing_application.user_id) if value] )) error_message = 'An application already exists for %s Duplicates might be rejected. %s '%(value,recruiter_contact) print(error_message) return { 'message': _(error_message) } # If no existing application found, show the following message return { 'message': _( 'We found a recent application with a similar name, email, phone number.' ' You can continue if it\'s not a mistake.' ) } else: return {'message': None} def _should_log_authenticate_message(self, record): if record._name == "hr.applicant" and not request.session.uid: return False return super()._should_log_authenticate_message(record) def extract_data(self, model, values): candidate = False extracted_resume = values.pop('resume_base64', None) current_ctc = values.pop('current_ctc', None) expected_ctc = values.pop('expected_ctc', None) available_joining_date = values.pop('available_joining_date', None) exp_type = values.pop('exp_type', None) current_location = values.pop('current_location', None) preferred_locations_str = values.pop('preferred_locations', '') department_id = values.pop('department_id',None) hr_job_recruitment = values.pop('job_id', None) preferred_locations = [int(x) for x in preferred_locations_str.split(',')] if len( preferred_locations_str) > 0 else [] current_organization = values.pop('current_organization', None) notice_period = values.pop('notice_period', 0) notice_period_type = values.pop('notice_period_type', 'day') experience_years = values.pop('experience_years',0) experience_months = values.pop('experience_months',0) # If there are months, convert everything to months if int(experience_months) > 0: total_experience = (int(experience_years) * 12) + int(experience_months) total_experience_type = 'month' else: total_experience = int(experience_years) total_experience_type = 'year' if extracted_resume: attachment = request.env.ref("hr_recruitment_extended.employee_recruitment_attachments_preview") file = attachment.sudo().write({ 'datas': extracted_resume, }) if file: resume_type = attachment.mimetype resume_name = attachment.name else: resume_type = '' resume_name = '' else: resume_type = '' resume_name = '' skill_dict = {key: ast.literal_eval(value) for key,value in values.items() if "skill" in key and value != '0'} if model.model == 'hr.applicant': partner_name = values.pop('full_name', None) partner_phone = values.pop('partner_phone', None) alternate_phone = values.pop('alternate_phone', None) partner_email = values.pop('email_from', None) degree = values.pop('degree', None) if partner_phone and partner_email: candidate = request.env['hr.candidate'].sudo().search([ '|', ('email_from', '=', partner_email), ('partner_phone', '=', partner_phone), ], limit=1) if candidate: candidate.sudo().write({ 'partner_name': partner_name, 'alternate_phone': alternate_phone, 'email_from': partner_email, 'partner_phone': partner_phone, 'type_id': int(degree) if degree.isdigit() else False, 'resume': extracted_resume, 'resume_type': resume_type, 'resume_name': resume_name, }) if not candidate: candidate = request.env['hr.candidate'].sudo().create({ 'partner_name': partner_name, 'email_from': partner_email, 'partner_phone': partner_phone, 'alternate_phone': alternate_phone, 'type_id': int(degree) if degree.isdigit() else False, 'resume': extracted_resume, 'resume_type': resume_type, 'resume_name': resume_name, }) # --------------------------------------------------------- # Skills From Portal # --------------------------------------------------------- primary_skill_ids = request.httprequest.form.getlist( 'primary_skill_ids' ) secondary_skill_ids = request.httprequest.form.getlist( 'secondary_skill_ids' ) # --------------------------------------------------------- # Merge & Remove Duplicates # --------------------------------------------------------- all_skill_ids = [] for skill_group in (primary_skill_ids + secondary_skill_ids): if skill_group: for skill_id in skill_group.split(','): if skill_id.strip().isdigit(): all_skill_ids.append(int(skill_id.strip())) # Remove duplicates all_skill_ids = list(set(all_skill_ids)) # --------------------------------------------------------- # Create Candidate Skill Lines # --------------------------------------------------------- existing_skill_ids = candidate.candidate_skill_ids.mapped( 'skill_id' ).ids for skill_id in all_skill_ids: if skill_id not in existing_skill_ids: skill = request.env['hr.skill'].sudo().browse(skill_id) default_level = skill.skill_type_id.skill_level_ids.filtered( lambda l: l.default_level )[:1] candidate.write({ 'candidate_skill_ids': [(0, 0, { 'candidate_id': candidate.id, 'skill_id': skill.id, 'skill_type_id': skill.skill_type_id.id, 'skill_level_id': default_level.id if default_level else False, })] }) # --------------------------------------------------------- # Store Quick Skills # --------------------------------------------------------- if all_skill_ids: candidate.write({ 'quick_skill_ids': [(6, 0, all_skill_ids)] }) # if len(skill_dict) > 0: # # candidate_skills_list = [] # for key, value in skill_dict.items(): # candidate_skills = dict() # skill_type_id = request.env['hr.skill'].sudo().browse(int(key.split("_")[1])).skill_type_id.id # candidate_skills['candidate_id'] = candidate.id # candidate_skills['skill_id'] = int(key.split("_")[1]) # candidate_skills['skill_level_id'] = value[0] # candidate_skills['level_progress'] = value[1] # candidate_skills['skill_type_id'] = skill_type_id # # skill = request.env['hr.candidate.skill'].sudo().create(candidate_skills) # # candidate_skills_list.append(skill.id) # if candidate.candidate_skill_ids: # if candidate_skills['skill_id'] not in candidate.candidate_skill_ids.skill_id.ids: # candidate.write({'candidate_skill_ids':[(0,4,candidate_skills)]}) # else: # candidate.write({'candidate_skill_ids':[(0,4,candidate_skills)]}) # # # else: # skills = None values['partner_name'] = partner_name if partner_phone: values['partner_phone'] = partner_phone if partner_email: values['email_from'] = partner_email notice_period_str = 'N/A' if notice_period and notice_period_type: notice_period_str = str(notice_period) + ' ' + str(notice_period_type) data = super().extract_data(model, values) data['record']['current_ctc'] = float(current_ctc if current_ctc else 0) data['record']['salary_expected'] = float(expected_ctc if expected_ctc else 0) data['record']['exp_type'] = exp_type if exp_type else 'fresher' data['record']['current_location'] = current_location if current_location else '' data['record']['current_organization'] = current_organization if current_organization else '' data['record']['notice_period'] = notice_period_str if notice_period_str else 'N/A' data['record']['notice_period_type'] = notice_period_type if notice_period_type else 'day' data['record']['hr_job_recruitment'] = int(hr_job_recruitment) if str(hr_job_recruitment).isdigit() else '' data['record']['department_id'] = int(department_id) if str(department_id).isdigit() else '' data['record']['availability'] = datetime.strptime(available_joining_date, '%Y-%m-%d').date() if available_joining_date else '' data['record']['total_exp'] = total_experience if total_experience else 0 data['record']['total_exp_type'] = total_experience_type if total_experience_type else 'year' # data['record']['resume'] = resume if resume else None if len(preferred_locations_str) > 0: data['record']['preferred_location'] = preferred_locations if candidate: data['record']['candidate_id'] = candidate.id data['record']['type_id'] = candidate.type_id.id return data