bio-metric speed

This commit is contained in:
raman 2025-06-03 17:09:15 +05:30
parent 19bdf92f67
commit 95e7fedfbd
1 changed files with 217 additions and 211 deletions

View File

@ -149,220 +149,221 @@ class BiometricDeviceDetails(models.Model):
raise ValidationError(f'{error}') raise ValidationError(f'{error}')
def action_download_attendance(self): def action_download_attendance(self):
"""Function to download attendance records from the device""" """Download attendance records from the device and process them"""
_logger.info("++++++++++++Cron Executed++++++++++++++++++++++") _logger.info("++++++++++++Cron Executed++++++++++++++++++++++")
zk_attendance = self.env['zk.machine.attendance'] zk_attendance = self.env['zk.machine.attendance']
hr_attendance = self.env['hr.attendance'] hr_attendance = self.env['hr.attendance']
for info in self:
machine_ip = info.device_ip for device in self:
zk_port = info.port_number
try: try:
# Connecting with the device with the ip and port provided # Connect to the device
zk = ZK(machine_ip, port=zk_port, timeout=15, zk = ZK(
password=self.device_password, device.device_ip,
force_udp=False, ommit_ping=False) port=device.port_number,
except NameError: timeout=15,
raise UserError( password=device.device_password,
_("Pyzk module not Found. Please install it" force_udp=False,
"with 'pip3 install pyzk'.")) ommit_ping=False
conn = self.device_connect(zk) )
self.get_device_information() conn = device.device_connect(zk)
if conn: if not conn:
raise UserError(
_('Unable to connect to the device. Please check parameters and network connection.'))
# Get device information and users
device.get_device_information()
conn.disable_device() conn.disable_device()
self.get_all_users() device.get_all_users()
# self.action_set_timezone()
user = conn.get_users()
# get All Fingerprints
fingers = conn.get_templates()
for use in user:
for finger in fingers:
if finger.uid == use.uid:
templates = conn.get_user_template(uid=use.uid,
temp_id=finger.fid,
user_id=use.user_id)
hex_data = templates.template.hex()
# Convert hex data to binary
binary_data = binascii.unhexlify(hex_data)
base64_data = base64.b64encode(binary_data).decode(
'utf-8')
employee = self.env['hr.employee'].search(
[('device_id_num', '=', use.user_id), ('company_id', '=', self.env.company.id)],
limit=1)
employee.device_ids |= self
if str(finger.fid) in employee.fingerprint_ids.mapped(
'finger_id'):
employee.fingerprint_ids.search(
[('finger_id', '=', finger.fid)]).update({
'finger_template': base64_data,
})
else:
employee.fingerprint_ids.create({
'finger_template': base64_data,
'finger_id': finger.fid,
'employee_bio_id': employee.id,
'filename': f'{employee.name}-finger-{finger.fid}'
})
# get all attendances
attendance = conn.get_attendance()
if attendance:
filtered_attendance = []
for each in attendance: # Process fingerprints
atten_time = each.timestamp self._process_fingerprints(conn, device)
# Localize and convert to UTC # Process attendance data
local_tz = pytz.timezone(self.env.user.partner_id.tz or 'GMT') attendance_data = conn.get_attendance()
local_dt = local_tz.localize(atten_time, is_dst=None) if not attendance_data:
utc_dt = local_dt.astimezone(pytz.utc)
utc_dt_str = utc_dt.strftime("%Y-%m-%d %H:%M:%S")
# Convert to datetime and then to Odoo string format
atten_time = datetime.datetime.strptime(utc_dt_str, "%Y-%m-%d %H:%M:%S")
atten_time = fields.Datetime.to_string(atten_time)
# Filter for today's date
if atten_time[:10] == datetime.datetime.today().date().strftime("%Y-%m-%d"):
filtered_attendance.append(each)
attendance = filtered_attendance
for each in attendance:
atten_time = each.timestamp
# Localize and convert to UTC
local_tz = pytz.timezone(self.env.user.partner_id.tz or 'GMT')
local_dt = local_tz.localize(atten_time, is_dst=None)
utc_dt = local_dt.astimezone(pytz.utc)
utc_dt_str = utc_dt.strftime("%Y-%m-%d %H:%M:%S")
# Convert to datetime and then to Odoo string format
atten_time = datetime.datetime.strptime(utc_dt_str, "%Y-%m-%d %H:%M:%S")
atten_time = fields.Datetime.to_string(atten_time)
for uid in user:
if uid.user_id == each.user_id:
get_user_id = self.env['hr.employee'].search(
[('device_id_num', '=', each.user_id), ('company_id', '=', self.env.company.id)],
limit=1)
check_in_today = hr_attendance.search([(
'employee_id', '=', get_user_id.id),
('check_in', '!=', False), ('check_out', '=', False)])
from datetime import timedelta
# Define the tolerance (10 minutes)
tolerance = timedelta(minutes=10)
# Convert the atten_time string to a datetime object
# Calculate the lower and upper bounds with the tolerance
atten_time_obj = datetime.datetime.strptime(atten_time, "%Y-%m-%d %H:%M:%S")
# Calculate the lower and upper bounds with the tolerance
lower_bound = atten_time_obj - tolerance
upper_bound = atten_time_obj + tolerance
# Ensure the 'check_in' and 'check_out' fields are datetime objects and compare them
next_in = hr_attendance.search([
('employee_id', '=', get_user_id.id),
('check_in', '>=', lower_bound),
('check_in', '<=', upper_bound)
])
next_out = hr_attendance.search([
('employee_id', '=', get_user_id.id),
('check_out', '>=', lower_bound),
('check_out', '<=', upper_bound)
])
if get_user_id:
if self.display_name == 'IN' and not check_in_today:
if next_in:
continue
hr_attendance.create({
'employee_id': get_user_id.id,
'check_in': atten_time,
})
get_user_id.attendance_state = 'checked_in'
elif check_in_today and self.display_name != 'IN':
if fields.Datetime.to_string(check_in_today.check_in) > atten_time or next_out:
continue
check_in_today.write({
'check_out': atten_time,
})
get_user_id.attendance_state = 'checked_out'
else:
pass
duplicate_atten_ids = zk_attendance.search(
[('device_id_num', '=', each.user_id),
('punching_time', '=', atten_time),
('company_id', '=', self.env.company.id)])
if not duplicate_atten_ids:
zk_attendance.create({
'employee_id': get_user_id.id,
'device_id_num': each.user_id,
'attendance_type': str(1),
'punch_type': '0' if self.display_name == 'IN' else '1',
'punching_time': atten_time,
'address_id': info.address_id.id,
'company_id': self.env.company.id
})
# att_var = hr_attendance.search([(
# 'employee_bio_id', '=', get_user_id.id),
# ('check_out', '=', False)])
# if self.display_name == 'IN': # check-in
# if not att_var:
# hr_attendance.create({
# 'employee_id':
# get_user_id.id,
# 'check_in': atten_time
# })
# else: # check-out
# if len(att_var) == 1:
# att_var.write({
# 'check_out': atten_time
# })
# else:
# att_var1 = hr_attendance.search(
# [('employee_id', '=',
# str(get_user_id.device_id_num))])
# if att_var1:
# att_var1[-1].write({
# 'check_out': atten_time
# })
else:
continue
employee = self.env['hr.employee'].create({
'device_id_num': each.user_id,
'device_ids': self.id,
'name': uid.name,
'company_id': self.company_id.id
})
zk_attendance.create({
'employee_id': employee.id,
'device_id_num': each.user_id,
'attendance_type': str(1),
'punch_type': '0' if self.display_name == 'IN' else '1',
'punching_time': atten_time,
'address_id': info.address_id.id,
'company_id': self.company_id.id
})
hr_attendance.create({
'employee_id': employee.id,
'check_in': atten_time
})
if not self.is_live_capture:
current_time = fields.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')
message = (f'Downloaded data from the device on '
f'{current_time} by {self.env.user.name}')
self.message_post(body=message)
conn.disconnect()
return True
else:
zk.test_voice(index=4) zk.test_voice(index=4)
raise UserError(_('Unable to get the attendance log, please' raise UserError(_('No attendance records found on the device.'))
'try again later.'))
else: self._process_attendance_data(attendance_data, conn, device, zk_attendance, hr_attendance)
raise UserError(_('Unable to connect, please check the'
'parameters and network connections.')) # Post success message if not live capture
if not device.is_live_capture:
current_time = fields.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = f'Downloaded data from the device on {current_time} by {self.env.user.name}'
device.message_post(body=message)
conn.disconnect()
return True
except NameError:
raise UserError(_("Pyzk module not found. Please install it with 'pip3 install pyzk'."))
except Exception as e:
_logger.error(f"Error processing device {device.device_ip}: {str(e)}")
raise UserError(_(f"Error processing device: {str(e)}"))
def _process_fingerprints(self, conn, device):
"""Process fingerprint data from the device"""
users = conn.get_users()
fingers = conn.get_templates()
for user in users:
for finger in fingers:
if finger.uid == user.uid:
templates = conn.get_user_template(
uid=user.uid,
temp_id=finger.fid,
user_id=user.user_id
)
# Convert template data
hex_data = templates.template.hex()
binary_data = binascii.unhexlify(hex_data)
base64_data = base64.b64encode(binary_data).decode('utf-8')
# Find or create employee
employee = self.env['hr.employee'].search([
('device_id_num', '=', user.user_id),
('company_id', '=', self.env.company.id)
], limit=1)
if not employee:
continue
employee.device_ids |= device
# Update or create fingerprint record
fingerprint = employee.fingerprint_ids.filtered(lambda f: f.finger_id == str(finger.fid))
if fingerprint:
fingerprint.finger_template = base64_data
else:
self.env['hr.employee.fingerprint'].create({
'finger_template': base64_data,
'finger_id': finger.fid,
'employee_bio_id': employee.id,
'filename': f'{employee.name}-finger-{finger.fid}'
})
def _process_attendance_data(self, attendance_data, conn, device, zk_attendance, hr_attendance):
"""Process attendance data from the device"""
users = conn.get_users()
today = datetime.datetime.today().date().strftime("%Y-%m-%d")
tolerance = datetime.timedelta(minutes=10)
# Filter today's attendance
filtered_attendance = [
att for att in attendance_data
if att.timestamp.date().strftime("%Y-%m-%d") == today
]
for attendance in filtered_attendance:
# Convert time to UTC
atten_time = self._convert_to_utc(attendance.timestamp, self.env.user.partner_id.tz or 'GMT')
atten_time_str = fields.Datetime.to_string(atten_time)
# Find matching user
user = next((u for u in users if u.user_id == attendance.user_id), None)
if not user:
continue
# Find or create employee
employee = self.env['hr.employee'].search([
('device_id_num', '=', attendance.user_id),
('company_id', '=', self.env.company.id)
], limit=1)
if not employee:
continue
employee = self._create_employee_from_device(user, device)
# Process attendance record
self._process_attendance_record(
device,
employee,
atten_time_str,
zk_attendance,
hr_attendance,
tolerance
)
def _convert_to_utc(self, timestamp, timezone_str):
"""Convert local timestamp to UTC"""
local_tz = pytz.timezone(timezone_str)
local_dt = local_tz.localize(timestamp, is_dst=None)
return local_dt.astimezone(pytz.utc)
def _create_employee_from_device(self, user, device):
"""Create new employee from device user data"""
return self.env['hr.employee'].create({
'device_id_num': user.user_id,
'device_ids': device.id,
'name': user.name,
'company_id': device.company_id.id
})
def _process_attendance_record(self, device, employee, atten_time_str, zk_attendance, hr_attendance, tolerance):
"""Process a single attendance record"""
# Convert string back to datetime for comparison
atten_time = fields.Datetime.from_string(atten_time_str)
# Check for existing attendance records
check_in_today = hr_attendance.search([
('employee_id', '=', employee.id),
('check_in', '!=', False),
('check_out', '=', False)
])
# Check for nearby records
lower_bound = atten_time - tolerance
upper_bound = atten_time + tolerance
nearby_in = hr_attendance.search([
('employee_id', '=', employee.id),
('check_in', '>=', lower_bound),
('check_in', '<=', upper_bound)
])
nearby_out = hr_attendance.search([
('employee_id', '=', employee.id),
('check_out', '>=', lower_bound),
('check_out', '<=', upper_bound)
])
# Process based on punch type
if device.display_name == 'IN' and not check_in_today and not nearby_in:
try:
hr_attendance.create({
'employee_id': employee.id,
'check_in': atten_time_str,
})
employee.attendance_state = 'checked_in'
except:
pass
elif check_in_today and device.display_name != 'IN' and not nearby_out:
if fields.Datetime.from_string(check_in_today.check_in) <= atten_time:
try:
check_in_today.write({
'check_out': atten_time_str,
})
employee.attendance_state = 'checked_out'
except:
pass
# Create zk attendance record if not exists
if not zk_attendance.search([
('device_id_num', '=', employee.device_id_num),
('punching_time', '=', atten_time_str),
('company_id', '=', self.env.company.id)
]):
zk_attendance.create({
'employee_id': employee.id,
'device_id_num': employee.device_id_num,
'attendance_type': str(1),
'punch_type': '0' if device.display_name == 'IN' else '1',
'punching_time': atten_time_str,
'address_id': device.address_id.id,
'company_id': self.env.company.id
})
def action_restart_device(self): def action_restart_device(self):
"""For restarting the device""" """For restarting the device"""
@ -676,11 +677,16 @@ class BiometricDeviceDetails(models.Model):
"with 'pip3 install pyzk'.")) "with 'pip3 install pyzk'."))
conn = self.device_connect(zk) conn = self.device_connect(zk)
if conn: if conn:
self.device_name = conn.get_device_name() if not self.device_name:
self.device_firmware = conn.get_firmware_version() self.device_name = conn.get_device_name()
self.device_serial_no = conn.get_serialnumber() if not self.device_firmware:
self.device_platform = conn.get_platform() self.device_firmware = conn.get_firmware_version()
self.device_mac = conn.get_mac() if not self.device_serial_no:
self.device_serial_no = conn.get_serialnumber()
if not self.device_platform:
self.device_platform = conn.get_platform()
if not self.device_mac:
self.device_mac = conn.get_mac()
else: else:
raise UserError(_( raise UserError(_(
"Please Check the Connection")) "Please Check the Connection"))