odoo18/addons_extensions/document_parser/models/document_parser_service.py

445 lines
17 KiB
Python

import base64
import json
import logging
import mimetypes
import re
from io import BytesIO
import requests
from odoo import _, api, models
from odoo.exceptions import UserError
try:
import pytesseract
except Exception: # pragma: no cover - optional dependency
pytesseract = None
try:
from PIL import Image
except Exception: # pragma: no cover - optional dependency
Image = None
try:
from pdf2image import convert_from_bytes
except Exception: # pragma: no cover - optional dependency
convert_from_bytes = None
try:
from pypdf import PdfReader
except Exception: # pragma: no cover - optional dependency
PdfReader = None
try:
from docx import Document
except Exception: # pragma: no cover - optional dependency
Document = None
_logger = logging.getLogger(__name__)
class DocumentParserService(models.AbstractModel):
_name = "document.parser.service"
_description = "Document Parser Service"
TOGETHER_ENDPOINT = "https://api.together.xyz/v1/chat/completions"
OPENROUTER_ENDPOINT = "https://openrouter.ai/api/v1/chat/completions"
TOGETHER_MODELS = [
"Qwen/Qwen2.5-7B-Instruct-Turbo",
"meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
]
OPENROUTER_MODELS = [
"qwen/qwen-2.5-7b-instruct",
"qwen/qwen-2.5-7b-instruct:free",
"deepseek/deepseek-chat:free",
]
@api.model
def parse_document(
self,
file_content,
filename=None,
required_fields=None,
extra_instructions=None,
json_schema=None,
):
if not file_content:
raise UserError(_("No document provided."))
if not filename:
raise UserError(_("Filename is required."))
binary = self._decode_file_content(file_content)
mimetype = self._detect_mimetype(binary, filename)
text_content = self._extract_text(binary, mimetype)
fields_spec = self._normalize_required_fields(required_fields or {})
if not text_content.strip():
return {
"filename": filename,
"mimetype": mimetype,
"text": "",
"result": {},
"provider": False,
"errors": [_("No text could be extracted from the document.")],
"error": _("No text could be extracted from the document."),
}
schema_text = json_schema or self._build_json_schema_text(fields_spec)
ai_result, provider_used, provider_errors = self._send_to_ai(
text_content=text_content[:45000],
schema_text=schema_text,
extra_instructions=extra_instructions,
)
if not ai_result:
ai_result = self._extract_with_heuristics(text_content, fields_spec)
ai_result = ai_result or {}
error_message = False
if not ai_result and provider_errors:
error_message = "; ".join(provider_errors[:3])
return {
"filename": filename,
"mimetype": mimetype,
"text": text_content,
"result": ai_result,
"provider": provider_used,
"errors": provider_errors,
"error": error_message,
}
@api.model
def extract_requested_data(self, file_content, filename, required_fields, extra_instructions=None, json_schema=None):
return self.parse_document(
file_content=file_content,
filename=filename,
required_fields=required_fields,
extra_instructions=extra_instructions,
json_schema=json_schema,
)["result"]
def _decode_file_content(self, file_content):
if isinstance(file_content, bytes):
if file_content.startswith((b"%PDF", b"\xFF\xD8", b"\x89PNG", b"PK")):
return file_content
try:
return base64.b64decode(file_content)
except Exception:
return file_content
if isinstance(file_content, str):
try:
return base64.b64decode(file_content)
except Exception as exc:
raise UserError(_("Invalid base64 document.")) from exc
raise UserError(_("Unsupported file format."))
def _detect_mimetype(self, binary, filename):
if filename:
guessed = mimetypes.guess_type(filename)[0]
if guessed:
return guessed
if binary.startswith(b"%PDF"):
return "application/pdf"
if binary.startswith(b"\xFF\xD8"):
return "image/jpeg"
if binary.startswith(b"\x89PNG"):
return "image/png"
if binary[:2] == b"PK":
return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
return "application/octet-stream"
def _extract_text(self, binary, mimetype):
text_content = ""
try:
if mimetype == "application/pdf":
text_content = self._extract_text_from_pdf(binary)
elif mimetype in {"image/png", "image/jpeg", "image/jpg"}:
text_content = self._extract_text_from_image(binary)
elif mimetype == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
text_content = self._extract_text_from_docx(binary)
elif mimetype.startswith("text/"):
text_content = binary.decode("utf-8", errors="ignore")
except Exception as exc:
_logger.exception("Document text extraction failed: %s", exc)
return (text_content or "").strip()
def _extract_text_from_pdf(self, binary):
extracted_parts = []
if PdfReader:
try:
reader = PdfReader(BytesIO(binary))
extracted_parts.extend(page.extract_text() or "" for page in reader.pages)
except Exception as exc:
_logger.warning("PdfReader extraction failed: %s", exc)
text_content = "\n".join(part for part in extracted_parts if part).strip()
if text_content:
return text_content
if convert_from_bytes and pytesseract:
try:
images = convert_from_bytes(binary, dpi=300)
return "\n".join(
pytesseract.image_to_string(image)
for image in images
).strip()
except Exception as exc:
_logger.warning("PDF OCR extraction failed: %s", exc)
return ""
def _extract_text_from_image(self, binary):
if not pytesseract or not Image:
return ""
try:
image = Image.open(BytesIO(binary))
return pytesseract.image_to_string(image).strip()
except Exception as exc:
_logger.warning("Image OCR extraction failed: %s", exc)
return ""
def _extract_text_from_docx(self, binary):
if not Document:
return ""
try:
document = Document(BytesIO(binary))
return "\n".join(
paragraph.text for paragraph in document.paragraphs if paragraph.text
).strip()
except Exception as exc:
_logger.warning("DOCX extraction failed: %s", exc)
return ""
def _send_to_ai(self, text_content, schema_text, extra_instructions=None):
prompt = self._build_prompt(text_content, schema_text, extra_instructions)
errors = []
together_key = self._get_param("document_parser.together_ai_key") or self._get_param("document_parser.together_api_key")
openrouter_key = self._get_param("document_parser.openrouter_ai_key") or self._get_param("document_parser.openrouter_api_key")
if together_key:
result, provider_errors = self._call_provider(
provider_name="Together",
endpoint=self.TOGETHER_ENDPOINT,
headers={
"Authorization": f"Bearer {together_key}",
"Content-Type": "application/json",
},
models=self.TOGETHER_MODELS,
prompt=prompt,
)
if result:
return result, "together", errors
errors.extend(provider_errors)
else:
errors.append(_("Together AI key is not configured."))
if openrouter_key:
result, provider_errors = self._call_provider(
provider_name="OpenRouter",
endpoint=self.OPENROUTER_ENDPOINT,
headers={
"Authorization": f"Bearer {openrouter_key}",
"Content-Type": "application/json",
"HTTP-Referer": self._get_param("web.base.url") or "odoo.local",
"X-Title": "Document Parser",
},
models=self.OPENROUTER_MODELS,
prompt=prompt,
)
if result:
return result, "openrouter", errors
errors.extend(provider_errors)
else:
errors.append(_("OpenRouter key is not configured."))
return {}, False, errors
def _build_prompt(self, text_content, schema_text, extra_instructions=None):
return f"""
You are a strict JSON generator.
RULES:
- Output ONLY valid raw JSON.
- No explanation.
- No markdown.
- No backticks.
- No extra text.
- Follow schema strictly.
- If a field is missing in text, return null.
- Scan the entire document carefully before answering.
- Extract ONLY what exists in text.
- FOR ANY DATES CHANGE FORMAT TO %Y-%m-%d
FIELD RULES:
- If "skills" exists, extract only explicit technical skills written in the document.
- Do NOT infer similar skills from role names, responsibilities, or projects.
- Normalize names like "Expert Python" to "Python".
- Exclude soft skills and business phrases.
- Exclude responsibility-style phrases like Cross-Functional Collaboration, Cost Saving, Resource Utilization, Documentation, Reporting, and Team Handling.
- Prefer concrete tools, methods, technologies, platforms, certifications, engineering/process methods, and domain techniques explicitly written in the resume.
- If the resume explicitly mentions items like AutoCAD, Root Cause Analysis, Project Management, Manufacturing Processes, Lean, Six Sigma, or Quality Control, include them.
- Remove duplicates and return each skill only once.
- If "email" exists, return one valid normalized email.
- If "name" exists, prefer the full name at the top and exclude titles, companies, and addresses.
- If "phone" exists, return the most complete phone number found.
- If "experience" exists, return only clearly supported numeric values.
Schema:
{schema_text}
Instructions:
{extra_instructions or "None"}
Document:
{text_content}
"""
def _call_provider(self, provider_name, endpoint, headers, models, prompt):
errors = []
for model in models:
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0,
"max_tokens": 1500,
}
try:
response = requests.post(endpoint, headers=headers, json=payload, timeout=90)
if response.status_code != 200:
message = _("%(provider)s model %(model)s failed with %(status)s: %(body)s") % {
"provider": provider_name,
"model": model,
"status": response.status_code,
"body": (response.text or "")[:300],
}
_logger.warning(message)
errors.append(message)
continue
body = response.json()
content = self._extract_message_content(body)
parsed = self._safe_json_load(content)
if parsed:
return parsed, errors
message = _("%(provider)s model %(model)s returned invalid JSON.") % {
"provider": provider_name,
"model": model,
}
_logger.warning(message)
errors.append(message)
except Exception as exc:
message = _("%(provider)s model %(model)s error: %(error)s") % {
"provider": provider_name,
"model": model,
"error": str(exc),
}
_logger.warning(message)
errors.append(message)
return {}, errors
def _extract_message_content(self, response_body):
try:
content = response_body["choices"][0]["message"]["content"]
except Exception:
return ""
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "text":
parts.append(item.get("text", ""))
elif item.get("text"):
parts.append(item.get("text"))
else:
parts.append(str(item))
return "\n".join(part for part in parts if part)
if isinstance(content, dict):
return content.get("text", "")
return content or ""
def _safe_json_load(self, content):
if not content:
return {}
content = content.strip().replace("```json", "").replace("```", "").strip()
try:
return json.loads(content)
except Exception:
pass
match = re.search(r"\{[\s\S]*\}", content)
if match:
try:
return json.loads(match.group(0))
except Exception:
pass
_logger.warning("JSON parse failed for provider response: %s", content[:500])
return {}
def _extract_with_heuristics(self, text_content, fields):
result = {}
email_match = re.search(r"([A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,})", text_content or "", re.I)
phone_match = re.search(r"(\+?\d[\d\-\s()]{7,}\d)", text_content or "")
linkedin_match = re.search(r"(https?://(?:www\.)?linkedin\.com/[^\s]+)", text_content or "", re.I)
name_guess = self._guess_name(text_content or "")
skills_guess = self._guess_skills(text_content or "")
for field_name, field_spec in fields.items():
field_type = field_spec.get("type", "string")
if field_name in {"email", "email_from"}:
result[field_name] = email_match.group(1).lower() if email_match else None
elif field_name in {"phone", "mobile", "partner_phone"}:
result[field_name] = phone_match.group(1).strip() if phone_match else None
elif field_name in {"linkedin_profile", "linkedin"}:
result[field_name] = linkedin_match.group(1).strip() if linkedin_match else None
elif field_name in {"name", "full_name", "partner_name"}:
result[field_name] = name_guess
elif field_name == "skills" and field_type == "list":
result[field_name] = skills_guess
else:
result[field_name] = None
return result
def _guess_name(self, text_content):
for line in [line.strip() for line in (text_content or "").splitlines() if line.strip()][:12]:
cleaned = re.sub(r"[^A-Za-z .'-]", "", line).strip()
if len(cleaned.split()) in {2, 3, 4} and not re.search(r"(resume|cv|email|phone|linkedin|skills|experience)", cleaned, re.I):
return cleaned
return None
def _guess_skills(self, text_content):
section = re.search(r"(skills|technical skills|core competencies)(.*?)(experience|education|projects|certifications|$)", text_content or "", re.I | re.S)
if not section:
return []
parts = re.split(r"[,;\n|•]", section.group(2))
cleaned = []
for part in parts:
value = re.sub(r"\s+", " ", part).strip(" -:\t\r\n")
if value and 1 < len(value) < 50 and not re.search(r"^(skills?|experience|education)$", value, re.I):
cleaned.append(value)
return list(dict.fromkeys(cleaned[:25]))
def _get_param(self, key):
return self.env["ir.config_parameter"].sudo().get_param(key)
def _normalize_required_fields(self, fields):
if isinstance(fields, dict):
normalized = {}
for field_name, field_value in fields.items():
if isinstance(field_value, dict):
normalized[field_name] = {
"type": field_value.get("type", "string"),
"description": field_value.get("description", field_name.replace("_", " ").title()),
}
else:
normalized[field_name] = {
"type": "string",
"description": str(field_value or field_name.replace("_", " ").title()),
}
return normalized
if isinstance(fields, list):
return {field_name: {"type": "string", "description": field_name.replace("_", " ").title()} for field_name in fields}
return {}
def _build_json_schema_text(self, fields):
return json.dumps(fields, ensure_ascii=True)