From 95e7fedfbd22c86accef65ef5b8f88d97801efaf Mon Sep 17 00:00:00 2001 From: raman Date: Tue, 3 Jun 2025 17:09:15 +0530 Subject: [PATCH] bio-metric speed --- .../models/biometric_device_details.py | 428 +++++++++--------- 1 file changed, 217 insertions(+), 211 deletions(-) diff --git a/third_party_addons/hr_biometric_attendance/models/biometric_device_details.py b/third_party_addons/hr_biometric_attendance/models/biometric_device_details.py index b42d201e5..144184d6e 100644 --- a/third_party_addons/hr_biometric_attendance/models/biometric_device_details.py +++ b/third_party_addons/hr_biometric_attendance/models/biometric_device_details.py @@ -149,220 +149,221 @@ class BiometricDeviceDetails(models.Model): raise ValidationError(f'{error}') def action_download_attendance(self): - """Function to download attendance records from the device""" + """Download attendance records from the device and process them""" _logger.info("++++++++++++Cron Executed++++++++++++++++++++++") + zk_attendance = self.env['zk.machine.attendance'] hr_attendance = self.env['hr.attendance'] - for info in self: - machine_ip = info.device_ip - zk_port = info.port_number + + for device in self: try: - # Connecting with the device with the ip and port provided - zk = ZK(machine_ip, port=zk_port, timeout=15, - password=self.device_password, - force_udp=False, ommit_ping=False) - except NameError: - raise UserError( - _("Pyzk module not Found. Please install it" - "with 'pip3 install pyzk'.")) - conn = self.device_connect(zk) - self.get_device_information() - if conn: + # Connect to the device + zk = ZK( + device.device_ip, + port=device.port_number, + timeout=15, + password=device.device_password, + force_udp=False, + ommit_ping=False + ) + conn = device.device_connect(zk) + if not conn: + raise UserError( + _('Unable to connect to the device. Please check parameters and network connection.')) + + # Get device information and users + device.get_device_information() conn.disable_device() - self.get_all_users() - # self.action_set_timezone() - user = conn.get_users() - # get All Fingerprints - fingers = conn.get_templates() - for use in user: - for finger in fingers: - if finger.uid == use.uid: - templates = conn.get_user_template(uid=use.uid, - temp_id=finger.fid, - user_id=use.user_id) - hex_data = templates.template.hex() - # Convert hex data to binary - binary_data = binascii.unhexlify(hex_data) - base64_data = base64.b64encode(binary_data).decode( - 'utf-8') - employee = self.env['hr.employee'].search( - [('device_id_num', '=', use.user_id), ('company_id', '=', self.env.company.id)], - limit=1) - employee.device_ids |= self - if str(finger.fid) in employee.fingerprint_ids.mapped( - 'finger_id'): - employee.fingerprint_ids.search( - [('finger_id', '=', finger.fid)]).update({ - 'finger_template': base64_data, - }) - else: - employee.fingerprint_ids.create({ - 'finger_template': base64_data, - 'finger_id': finger.fid, - 'employee_bio_id': employee.id, - 'filename': f'{employee.name}-finger-{finger.fid}' - }) - # get all attendances - attendance = conn.get_attendance() - if attendance: - filtered_attendance = [] + device.get_all_users() - for each in attendance: - atten_time = each.timestamp + # Process fingerprints + self._process_fingerprints(conn, device) - # Localize and convert to UTC - local_tz = pytz.timezone(self.env.user.partner_id.tz or 'GMT') - local_dt = local_tz.localize(atten_time, is_dst=None) - utc_dt = local_dt.astimezone(pytz.utc) - utc_dt_str = utc_dt.strftime("%Y-%m-%d %H:%M:%S") - - # Convert to datetime and then to Odoo string format - atten_time = datetime.datetime.strptime(utc_dt_str, "%Y-%m-%d %H:%M:%S") - atten_time = fields.Datetime.to_string(atten_time) - - # Filter for today's date - if atten_time[:10] == datetime.datetime.today().date().strftime("%Y-%m-%d"): - filtered_attendance.append(each) - attendance = filtered_attendance - for each in attendance: - atten_time = each.timestamp - - # Localize and convert to UTC - local_tz = pytz.timezone(self.env.user.partner_id.tz or 'GMT') - local_dt = local_tz.localize(atten_time, is_dst=None) - utc_dt = local_dt.astimezone(pytz.utc) - utc_dt_str = utc_dt.strftime("%Y-%m-%d %H:%M:%S") - - # Convert to datetime and then to Odoo string format - atten_time = datetime.datetime.strptime(utc_dt_str, "%Y-%m-%d %H:%M:%S") - atten_time = fields.Datetime.to_string(atten_time) - - for uid in user: - if uid.user_id == each.user_id: - get_user_id = self.env['hr.employee'].search( - [('device_id_num', '=', each.user_id), ('company_id', '=', self.env.company.id)], - limit=1) - check_in_today = hr_attendance.search([( - 'employee_id', '=', get_user_id.id), - ('check_in', '!=', False), ('check_out', '=', False)]) - from datetime import timedelta - - # Define the tolerance (10 minutes) - tolerance = timedelta(minutes=10) - - # Convert the atten_time string to a datetime object - - # Calculate the lower and upper bounds with the tolerance - atten_time_obj = datetime.datetime.strptime(atten_time, "%Y-%m-%d %H:%M:%S") - - # Calculate the lower and upper bounds with the tolerance - lower_bound = atten_time_obj - tolerance - upper_bound = atten_time_obj + tolerance - - # Ensure the 'check_in' and 'check_out' fields are datetime objects and compare them - next_in = hr_attendance.search([ - ('employee_id', '=', get_user_id.id), - ('check_in', '>=', lower_bound), - ('check_in', '<=', upper_bound) - ]) - - next_out = hr_attendance.search([ - ('employee_id', '=', get_user_id.id), - ('check_out', '>=', lower_bound), - ('check_out', '<=', upper_bound) - ]) - if get_user_id: - if self.display_name == 'IN' and not check_in_today: - if next_in: - continue - hr_attendance.create({ - 'employee_id': get_user_id.id, - 'check_in': atten_time, - }) - get_user_id.attendance_state = 'checked_in' - elif check_in_today and self.display_name != 'IN': - if fields.Datetime.to_string(check_in_today.check_in) > atten_time or next_out: - continue - check_in_today.write({ - 'check_out': atten_time, - }) - get_user_id.attendance_state = 'checked_out' - - else: - pass - duplicate_atten_ids = zk_attendance.search( - [('device_id_num', '=', each.user_id), - ('punching_time', '=', atten_time), - ('company_id', '=', self.env.company.id)]) - if not duplicate_atten_ids: - zk_attendance.create({ - 'employee_id': get_user_id.id, - 'device_id_num': each.user_id, - 'attendance_type': str(1), - 'punch_type': '0' if self.display_name == 'IN' else '1', - 'punching_time': atten_time, - 'address_id': info.address_id.id, - 'company_id': self.env.company.id - }) - # att_var = hr_attendance.search([( - # 'employee_bio_id', '=', get_user_id.id), - # ('check_out', '=', False)]) - # if self.display_name == 'IN': # check-in - # if not att_var: - # hr_attendance.create({ - # 'employee_id': - # get_user_id.id, - # 'check_in': atten_time - # }) - # else: # check-out - # if len(att_var) == 1: - # att_var.write({ - # 'check_out': atten_time - # }) - # else: - # att_var1 = hr_attendance.search( - # [('employee_id', '=', - # str(get_user_id.device_id_num))]) - # if att_var1: - # att_var1[-1].write({ - # 'check_out': atten_time - # }) - else: - continue - employee = self.env['hr.employee'].create({ - 'device_id_num': each.user_id, - 'device_ids': self.id, - 'name': uid.name, - 'company_id': self.company_id.id - }) - zk_attendance.create({ - 'employee_id': employee.id, - 'device_id_num': each.user_id, - 'attendance_type': str(1), - 'punch_type': '0' if self.display_name == 'IN' else '1', - 'punching_time': atten_time, - 'address_id': info.address_id.id, - 'company_id': self.company_id.id - }) - hr_attendance.create({ - 'employee_id': employee.id, - 'check_in': atten_time - }) - if not self.is_live_capture: - current_time = fields.datetime.now().strftime( - '%Y-%m-%d %H:%M:%S') - message = (f'Downloaded data from the device on ' - f'{current_time} by {self.env.user.name}') - self.message_post(body=message) - conn.disconnect() - return True - else: + # Process attendance data + attendance_data = conn.get_attendance() + if not attendance_data: zk.test_voice(index=4) - raise UserError(_('Unable to get the attendance log, please' - 'try again later.')) - else: - raise UserError(_('Unable to connect, please check the' - 'parameters and network connections.')) + raise UserError(_('No attendance records found on the device.')) + + self._process_attendance_data(attendance_data, conn, device, zk_attendance, hr_attendance) + + # Post success message if not live capture + if not device.is_live_capture: + current_time = fields.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + message = f'Downloaded data from the device on {current_time} by {self.env.user.name}' + device.message_post(body=message) + + conn.disconnect() + return True + + except NameError: + raise UserError(_("Pyzk module not found. Please install it with 'pip3 install pyzk'.")) + except Exception as e: + _logger.error(f"Error processing device {device.device_ip}: {str(e)}") + raise UserError(_(f"Error processing device: {str(e)}")) + + def _process_fingerprints(self, conn, device): + """Process fingerprint data from the device""" + users = conn.get_users() + fingers = conn.get_templates() + + for user in users: + for finger in fingers: + if finger.uid == user.uid: + templates = conn.get_user_template( + uid=user.uid, + temp_id=finger.fid, + user_id=user.user_id + ) + + # Convert template data + hex_data = templates.template.hex() + binary_data = binascii.unhexlify(hex_data) + base64_data = base64.b64encode(binary_data).decode('utf-8') + + # Find or create employee + employee = self.env['hr.employee'].search([ + ('device_id_num', '=', user.user_id), + ('company_id', '=', self.env.company.id) + ], limit=1) + + if not employee: + continue + + employee.device_ids |= device + + # Update or create fingerprint record + fingerprint = employee.fingerprint_ids.filtered(lambda f: f.finger_id == str(finger.fid)) + if fingerprint: + fingerprint.finger_template = base64_data + else: + self.env['hr.employee.fingerprint'].create({ + 'finger_template': base64_data, + 'finger_id': finger.fid, + 'employee_bio_id': employee.id, + 'filename': f'{employee.name}-finger-{finger.fid}' + }) + + def _process_attendance_data(self, attendance_data, conn, device, zk_attendance, hr_attendance): + """Process attendance data from the device""" + users = conn.get_users() + today = datetime.datetime.today().date().strftime("%Y-%m-%d") + tolerance = datetime.timedelta(minutes=10) + + # Filter today's attendance + filtered_attendance = [ + att for att in attendance_data + if att.timestamp.date().strftime("%Y-%m-%d") == today + ] + + for attendance in filtered_attendance: + # Convert time to UTC + atten_time = self._convert_to_utc(attendance.timestamp, self.env.user.partner_id.tz or 'GMT') + atten_time_str = fields.Datetime.to_string(atten_time) + + # Find matching user + user = next((u for u in users if u.user_id == attendance.user_id), None) + if not user: + continue + + # Find or create employee + employee = self.env['hr.employee'].search([ + ('device_id_num', '=', attendance.user_id), + ('company_id', '=', self.env.company.id) + ], limit=1) + + if not employee: + continue + employee = self._create_employee_from_device(user, device) + + # Process attendance record + self._process_attendance_record( + device, + employee, + atten_time_str, + zk_attendance, + hr_attendance, + tolerance + ) + + def _convert_to_utc(self, timestamp, timezone_str): + """Convert local timestamp to UTC""" + local_tz = pytz.timezone(timezone_str) + local_dt = local_tz.localize(timestamp, is_dst=None) + return local_dt.astimezone(pytz.utc) + + def _create_employee_from_device(self, user, device): + """Create new employee from device user data""" + return self.env['hr.employee'].create({ + 'device_id_num': user.user_id, + 'device_ids': device.id, + 'name': user.name, + 'company_id': device.company_id.id + }) + + def _process_attendance_record(self, device, employee, atten_time_str, zk_attendance, hr_attendance, tolerance): + """Process a single attendance record""" + # Convert string back to datetime for comparison + atten_time = fields.Datetime.from_string(atten_time_str) + + # Check for existing attendance records + check_in_today = hr_attendance.search([ + ('employee_id', '=', employee.id), + ('check_in', '!=', False), + ('check_out', '=', False) + ]) + + # Check for nearby records + lower_bound = atten_time - tolerance + upper_bound = atten_time + tolerance + + nearby_in = hr_attendance.search([ + ('employee_id', '=', employee.id), + ('check_in', '>=', lower_bound), + ('check_in', '<=', upper_bound) + ]) + + nearby_out = hr_attendance.search([ + ('employee_id', '=', employee.id), + ('check_out', '>=', lower_bound), + ('check_out', '<=', upper_bound) + ]) + + # Process based on punch type + if device.display_name == 'IN' and not check_in_today and not nearby_in: + try: + hr_attendance.create({ + 'employee_id': employee.id, + 'check_in': atten_time_str, + }) + employee.attendance_state = 'checked_in' + except: + pass + elif check_in_today and device.display_name != 'IN' and not nearby_out: + if fields.Datetime.from_string(check_in_today.check_in) <= atten_time: + try: + check_in_today.write({ + 'check_out': atten_time_str, + }) + employee.attendance_state = 'checked_out' + except: + pass + + # Create zk attendance record if not exists + if not zk_attendance.search([ + ('device_id_num', '=', employee.device_id_num), + ('punching_time', '=', atten_time_str), + ('company_id', '=', self.env.company.id) + ]): + zk_attendance.create({ + 'employee_id': employee.id, + 'device_id_num': employee.device_id_num, + 'attendance_type': str(1), + 'punch_type': '0' if device.display_name == 'IN' else '1', + 'punching_time': atten_time_str, + 'address_id': device.address_id.id, + 'company_id': self.env.company.id + }) def action_restart_device(self): """For restarting the device""" @@ -676,11 +677,16 @@ class BiometricDeviceDetails(models.Model): "with 'pip3 install pyzk'.")) conn = self.device_connect(zk) if conn: - self.device_name = conn.get_device_name() - self.device_firmware = conn.get_firmware_version() - self.device_serial_no = conn.get_serialnumber() - self.device_platform = conn.get_platform() - self.device_mac = conn.get_mac() + if not self.device_name: + self.device_name = conn.get_device_name() + if not self.device_firmware: + self.device_firmware = conn.get_firmware_version() + if not self.device_serial_no: + self.device_serial_no = conn.get_serialnumber() + if not self.device_platform: + self.device_platform = conn.get_platform() + if not self.device_mac: + self.device_mac = conn.get_mac() else: raise UserError(_( "Please Check the Connection"))