import base64 import json import logging import mimetypes import re from io import BytesIO import requests from odoo import _, api, models from odoo.exceptions import UserError try: import pytesseract except Exception: # pragma: no cover - optional dependency pytesseract = None try: from PIL import Image except Exception: # pragma: no cover - optional dependency Image = None try: from pdf2image import convert_from_bytes except Exception: # pragma: no cover - optional dependency convert_from_bytes = None try: from pypdf import PdfReader except Exception: # pragma: no cover - optional dependency PdfReader = None try: from docx import Document except Exception: # pragma: no cover - optional dependency Document = None _logger = logging.getLogger(__name__) class DocumentParserService(models.AbstractModel): _name = "document.parser.service" _description = "Document Parser Service" TOGETHER_ENDPOINT = "https://api.together.xyz/v1/chat/completions" OPENROUTER_ENDPOINT = "https://openrouter.ai/api/v1/chat/completions" TOGETHER_MODELS = [ "Qwen/Qwen2.5-7B-Instruct-Turbo", "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", ] OPENROUTER_MODELS = [ "qwen/qwen-2.5-7b-instruct", "qwen/qwen-2.5-7b-instruct:free", "deepseek/deepseek-chat:free", ] @api.model def parse_document( self, file_content, filename=None, required_fields=None, extra_instructions=None, json_schema=None, ): if not file_content: raise UserError(_("No document provided.")) if not filename: raise UserError(_("Filename is required.")) binary = self._decode_file_content(file_content) mimetype = self._detect_mimetype(binary, filename) text_content = self._extract_text(binary, mimetype) fields_spec = self._normalize_required_fields(required_fields or {}) if not text_content.strip(): return { "filename": filename, "mimetype": mimetype, "text": "", "result": {}, "provider": False, "errors": [_("No text could be extracted from the document.")], "error": _("No text could be extracted from the document."), } schema_text = json_schema or self._build_json_schema_text(fields_spec) ai_result, provider_used, provider_errors = self._send_to_ai( text_content=text_content[:45000], schema_text=schema_text, extra_instructions=extra_instructions, ) if not ai_result: ai_result = self._extract_with_heuristics(text_content, fields_spec) ai_result = ai_result or {} error_message = False if not ai_result and provider_errors: error_message = "; ".join(provider_errors[:3]) return { "filename": filename, "mimetype": mimetype, "text": text_content, "result": ai_result, "provider": provider_used, "errors": provider_errors, "error": error_message, } @api.model def extract_requested_data(self, file_content, filename, required_fields, extra_instructions=None, json_schema=None): return self.parse_document( file_content=file_content, filename=filename, required_fields=required_fields, extra_instructions=extra_instructions, json_schema=json_schema, )["result"] def _decode_file_content(self, file_content): if isinstance(file_content, bytes): if file_content.startswith((b"%PDF", b"\xFF\xD8", b"\x89PNG", b"PK")): return file_content try: return base64.b64decode(file_content) except Exception: return file_content if isinstance(file_content, str): try: return base64.b64decode(file_content) except Exception as exc: raise UserError(_("Invalid base64 document.")) from exc raise UserError(_("Unsupported file format.")) def _detect_mimetype(self, binary, filename): if filename: guessed = mimetypes.guess_type(filename)[0] if guessed: return guessed if binary.startswith(b"%PDF"): return "application/pdf" if binary.startswith(b"\xFF\xD8"): return "image/jpeg" if binary.startswith(b"\x89PNG"): return "image/png" if binary[:2] == b"PK": return "application/vnd.openxmlformats-officedocument.wordprocessingml.document" return "application/octet-stream" def _extract_text(self, binary, mimetype): text_content = "" try: if mimetype == "application/pdf": text_content = self._extract_text_from_pdf(binary) elif mimetype in {"image/png", "image/jpeg", "image/jpg"}: text_content = self._extract_text_from_image(binary) elif mimetype == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": text_content = self._extract_text_from_docx(binary) elif mimetype.startswith("text/"): text_content = binary.decode("utf-8", errors="ignore") except Exception as exc: _logger.exception("Document text extraction failed: %s", exc) return (text_content or "").strip() def _extract_text_from_pdf(self, binary): extracted_parts = [] if PdfReader: try: reader = PdfReader(BytesIO(binary)) extracted_parts.extend(page.extract_text() or "" for page in reader.pages) except Exception as exc: _logger.warning("PdfReader extraction failed: %s", exc) text_content = "\n".join(part for part in extracted_parts if part).strip() if text_content: return text_content if convert_from_bytes and pytesseract: try: images = convert_from_bytes(binary, dpi=300) return "\n".join( pytesseract.image_to_string(image) for image in images ).strip() except Exception as exc: _logger.warning("PDF OCR extraction failed: %s", exc) return "" def _extract_text_from_image(self, binary): if not pytesseract or not Image: return "" try: image = Image.open(BytesIO(binary)) return pytesseract.image_to_string(image).strip() except Exception as exc: _logger.warning("Image OCR extraction failed: %s", exc) return "" def _extract_text_from_docx(self, binary): if not Document: return "" try: document = Document(BytesIO(binary)) return "\n".join( paragraph.text for paragraph in document.paragraphs if paragraph.text ).strip() except Exception as exc: _logger.warning("DOCX extraction failed: %s", exc) return "" def _send_to_ai(self, text_content, schema_text, extra_instructions=None): prompt = self._build_prompt(text_content, schema_text, extra_instructions) errors = [] together_key = self._get_param("document_parser.together_ai_key") or self._get_param("document_parser.together_api_key") openrouter_key = self._get_param("document_parser.openrouter_ai_key") or self._get_param("document_parser.openrouter_api_key") if together_key: result, provider_errors = self._call_provider( provider_name="Together", endpoint=self.TOGETHER_ENDPOINT, headers={ "Authorization": f"Bearer {together_key}", "Content-Type": "application/json", }, models=self.TOGETHER_MODELS, prompt=prompt, ) if result: return result, "together", errors errors.extend(provider_errors) else: errors.append(_("Together AI key is not configured.")) if openrouter_key: result, provider_errors = self._call_provider( provider_name="OpenRouter", endpoint=self.OPENROUTER_ENDPOINT, headers={ "Authorization": f"Bearer {openrouter_key}", "Content-Type": "application/json", "HTTP-Referer": self._get_param("web.base.url") or "odoo.local", "X-Title": "Document Parser", }, models=self.OPENROUTER_MODELS, prompt=prompt, ) if result: return result, "openrouter", errors errors.extend(provider_errors) else: errors.append(_("OpenRouter key is not configured.")) return {}, False, errors def _build_prompt(self, text_content, schema_text, extra_instructions=None): return f""" You are a strict JSON generator. RULES: - Output ONLY valid raw JSON. - No explanation. - No markdown. - No backticks. - No extra text. - Follow schema strictly. - If a field is missing in text, return null. - Scan the entire document carefully before answering. - Extract ONLY what exists in text. - FOR ANY DATES CHANGE FORMAT TO %Y-%m-%d FIELD RULES: - If "skills" exists, extract only explicit technical skills written in the document. - Do NOT infer similar skills from role names, responsibilities, or projects. - Normalize names like "Expert Python" to "Python". - Exclude soft skills and business phrases. - Exclude responsibility-style phrases like Cross-Functional Collaboration, Cost Saving, Resource Utilization, Documentation, Reporting, and Team Handling. - Prefer concrete tools, methods, technologies, platforms, certifications, engineering/process methods, and domain techniques explicitly written in the resume. - If the resume explicitly mentions items like AutoCAD, Root Cause Analysis, Project Management, Manufacturing Processes, Lean, Six Sigma, or Quality Control, include them. - Remove duplicates and return each skill only once. - If "email" exists, return one valid normalized email. - If "name" exists, prefer the full name at the top and exclude titles, companies, and addresses. - If "phone" exists, return the most complete phone number found. - If "experience" exists, return only clearly supported numeric values. Schema: {schema_text} Instructions: {extra_instructions or "None"} Document: {text_content} """ def _call_provider(self, provider_name, endpoint, headers, models, prompt): errors = [] for model in models: payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0, "max_tokens": 1500, } try: response = requests.post(endpoint, headers=headers, json=payload, timeout=90) if response.status_code != 200: message = _("%(provider)s model %(model)s failed with %(status)s: %(body)s") % { "provider": provider_name, "model": model, "status": response.status_code, "body": (response.text or "")[:300], } _logger.warning(message) errors.append(message) continue body = response.json() content = self._extract_message_content(body) parsed = self._safe_json_load(content) if parsed: return parsed, errors message = _("%(provider)s model %(model)s returned invalid JSON.") % { "provider": provider_name, "model": model, } _logger.warning(message) errors.append(message) except Exception as exc: message = _("%(provider)s model %(model)s error: %(error)s") % { "provider": provider_name, "model": model, "error": str(exc), } _logger.warning(message) errors.append(message) return {}, errors def _extract_message_content(self, response_body): try: content = response_body["choices"][0]["message"]["content"] except Exception: return "" if isinstance(content, list): parts = [] for item in content: if isinstance(item, dict): if item.get("type") == "text": parts.append(item.get("text", "")) elif item.get("text"): parts.append(item.get("text")) else: parts.append(str(item)) return "\n".join(part for part in parts if part) if isinstance(content, dict): return content.get("text", "") return content or "" def _safe_json_load(self, content): if not content: return {} content = content.strip().replace("```json", "").replace("```", "").strip() try: return json.loads(content) except Exception: pass match = re.search(r"\{[\s\S]*\}", content) if match: try: return json.loads(match.group(0)) except Exception: pass _logger.warning("JSON parse failed for provider response: %s", content[:500]) return {} def _extract_with_heuristics(self, text_content, fields): result = {} email_match = re.search(r"([A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,})", text_content or "", re.I) phone_match = re.search(r"(\+?\d[\d\-\s()]{7,}\d)", text_content or "") linkedin_match = re.search(r"(https?://(?:www\.)?linkedin\.com/[^\s]+)", text_content or "", re.I) name_guess = self._guess_name(text_content or "") skills_guess = self._guess_skills(text_content or "") for field_name, field_spec in fields.items(): field_type = field_spec.get("type", "string") if field_name in {"email", "email_from"}: result[field_name] = email_match.group(1).lower() if email_match else None elif field_name in {"phone", "mobile", "partner_phone"}: result[field_name] = phone_match.group(1).strip() if phone_match else None elif field_name in {"linkedin_profile", "linkedin"}: result[field_name] = linkedin_match.group(1).strip() if linkedin_match else None elif field_name in {"name", "full_name", "partner_name"}: result[field_name] = name_guess elif field_name == "skills" and field_type == "list": result[field_name] = skills_guess else: result[field_name] = None return result def _guess_name(self, text_content): for line in [line.strip() for line in (text_content or "").splitlines() if line.strip()][:12]: cleaned = re.sub(r"[^A-Za-z .'-]", "", line).strip() if len(cleaned.split()) in {2, 3, 4} and not re.search(r"(resume|cv|email|phone|linkedin|skills|experience)", cleaned, re.I): return cleaned return None def _guess_skills(self, text_content): section = re.search(r"(skills|technical skills|core competencies)(.*?)(experience|education|projects|certifications|$)", text_content or "", re.I | re.S) if not section: return [] parts = re.split(r"[,;\n|•]", section.group(2)) cleaned = [] for part in parts: value = re.sub(r"\s+", " ", part).strip(" -:\t\r\n") if value and 1 < len(value) < 50 and not re.search(r"^(skills?|experience|education)$", value, re.I): cleaned.append(value) return list(dict.fromkeys(cleaned[:25])) def _get_param(self, key): return self.env["ir.config_parameter"].sudo().get_param(key) def _normalize_required_fields(self, fields): if isinstance(fields, dict): normalized = {} for field_name, field_value in fields.items(): if isinstance(field_value, dict): normalized[field_name] = { "type": field_value.get("type", "string"), "description": field_value.get("description", field_name.replace("_", " ").title()), } else: normalized[field_name] = { "type": "string", "description": str(field_value or field_name.replace("_", " ").title()), } return normalized if isinstance(fields, list): return {field_name: {"type": "string", "description": field_name.replace("_", " ").title()} for field_name in fields} return {} def _build_json_schema_text(self, fields): return json.dumps(fields, ensure_ascii=True)