2061 lines
100 KiB
Python
2061 lines
100 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import base64
|
|
import contextlib
|
|
import io
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from ast import literal_eval
|
|
from collections import Counter, OrderedDict, defaultdict
|
|
|
|
import requests
|
|
from dateutil.relativedelta import relativedelta
|
|
from markupsafe import Markup
|
|
from werkzeug.urls import url_encode
|
|
|
|
import odoo
|
|
from odoo import _, api, Command, fields, models
|
|
from odoo.exceptions import AccessError, UserError, ValidationError
|
|
from odoo.osv import expression
|
|
from odoo.tools import groupby, image_process, SQL, create_index
|
|
from odoo.tools.mimetypes import get_extension
|
|
from odoo.tools.misc import clean_context
|
|
from odoo.tools.pdf import PdfFileReader
|
|
from odoo.addons.mail.tools import link_preview
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _sanitize_file_extension(extension):
|
|
""" Remove leading and trailing spacing + Remove leading "." """
|
|
return re.sub(r'^[\s.]+|\s+$', '', extension)
|
|
|
|
|
|
class Document(models.Model):
|
|
_name = 'documents.document'
|
|
_description = 'Document'
|
|
_inherit = ['mail.thread.cc', 'mail.activity.mixin', 'mail.alias.mixin']
|
|
_order = 'id desc'
|
|
_parent_name = 'folder_id'
|
|
_parent_store = True
|
|
_systray_view = 'activity'
|
|
|
|
# Attachment
|
|
attachment_id = fields.Many2one('ir.attachment', ondelete='cascade', auto_join=True, copy=False)
|
|
attachment_name = fields.Char('Attachment Name', related='attachment_id.name', readonly=False)
|
|
attachment_type = fields.Selection(string='Attachment Type', related='attachment_id.type', readonly=False)
|
|
is_editable_attachment = fields.Boolean(default=False, help='True if we can edit the link attachment.')
|
|
is_multipage = fields.Boolean(
|
|
'Is considered multipage', compute='_compute_is_multipage', store=True, readonly=False)
|
|
datas = fields.Binary(related='attachment_id.datas', related_sudo=True, readonly=False, prefetch=False)
|
|
raw = fields.Binary(related='attachment_id.raw', related_sudo=True, readonly=False, prefetch=False)
|
|
file_extension = fields.Char('File Extension', copy=True, store=True, readonly=False,
|
|
compute='_compute_file_extension', inverse='_inverse_file_extension')
|
|
file_size = fields.Integer(related='attachment_id.file_size', store=True)
|
|
checksum = fields.Char(related='attachment_id.checksum')
|
|
mimetype = fields.Char(related='attachment_id.mimetype')
|
|
res_model = fields.Char('Resource Model', compute="_compute_res_record", recursive=True,
|
|
inverse="_inverse_res_model", store=True)
|
|
res_id = fields.Many2oneReference('Resource ID', compute="_compute_res_record", recursive=True,
|
|
inverse="_inverse_res_model", store=True, model_field="res_model")
|
|
res_name = fields.Char('Resource Name', compute="_compute_res_name", compute_sudo=True)
|
|
index_content = fields.Text(related='attachment_id.index_content')
|
|
description = fields.Text('Attachment Description', related='attachment_id.description', readonly=False)
|
|
|
|
# Versioning
|
|
previous_attachment_ids = fields.Many2many('ir.attachment', string="History")
|
|
|
|
# Document
|
|
name = fields.Char('Name', copy=True, store=True, compute='_compute_name_and_preview', readonly=False)
|
|
active = fields.Boolean(default=True, string="Active")
|
|
thumbnail = fields.Binary(
|
|
readonly=False, store=True, attachment=True, compute='_compute_thumbnail', recursive=True)
|
|
thumbnail_status = fields.Selection([
|
|
('present', 'Present'), # Document has a thumbnail
|
|
('error', 'Error'), # Error when generating the thumbnail
|
|
('client_generated', 'Client Generated'), # The PDF thumbnail is generated by the user browser
|
|
('restricted', 'Inaccessible'), # Shortcut to no-permission source
|
|
], compute="_compute_thumbnail", store=True, readonly=False, recursive=True,
|
|
)
|
|
url = fields.Char('Link URL', index=True, size=1024, tracking=True)
|
|
url_preview_image = fields.Char(
|
|
'URL Preview Image', store=True, compute='_compute_name_and_preview', readonly=False)
|
|
res_model_name = fields.Char(compute='_compute_res_model_name', index=True)
|
|
type = fields.Selection([('url', 'URL'), ('binary', 'File'), ('folder', 'Folder')],
|
|
default='binary', string='Type', required=True, readonly=True)
|
|
shortcut_document_id = fields.Many2one('documents.document', 'Source Document', ondelete='cascade',
|
|
index='btree_not_null')
|
|
shortcut_ids = fields.One2many('documents.document', 'shortcut_document_id')
|
|
|
|
favorited_ids = fields.Many2many('res.users', string="Favorite of")
|
|
is_favorited = fields.Boolean(compute='_compute_is_favorited', inverse='_inverse_is_favorited')
|
|
tag_ids = fields.Many2many('documents.tag', 'document_tag_rel', string="Tags")
|
|
partner_id = fields.Many2one('res.partner', string="Contact", tracking=True)
|
|
owner_id = fields.Many2one('res.users', default=lambda self: self.env.user.id, string="Owner",
|
|
required=True, tracking=True, index=True)
|
|
lock_uid = fields.Many2one('res.users', string="Locked by")
|
|
is_locked = fields.Boolean(compute="_compute_is_locked", string="Locked")
|
|
request_activity_id = fields.Many2one('mail.activity')
|
|
requestee_partner_id = fields.Many2one('res.partner')
|
|
|
|
# Access
|
|
document_token = fields.Char(
|
|
required=True, copy=False, index='btree',
|
|
default=lambda __: base64.urlsafe_b64encode(uuid.uuid4().bytes).decode().removesuffix('=='))
|
|
access_token = fields.Char(compute='_compute_access_token')
|
|
|
|
access_url = fields.Char(string='Access url', compute='_compute_access_url')
|
|
is_access_via_link_hidden = fields.Boolean(
|
|
'Link Access Hidden', index=True,
|
|
help='If "True", only people given direct access to this document will be able to view it. '
|
|
'If "False", access with the link also given to all who can access the parent folder.'
|
|
)
|
|
access_via_link = fields.Selection(
|
|
[('view', 'Viewer'), ('edit', 'Editor'), ('none', 'None')],
|
|
string='Link Access Rights', default='none', required=True, index=True)
|
|
access_internal = fields.Selection(
|
|
[('view', 'Viewer'), ('edit', 'Editor'), ('none', 'None')],
|
|
string='Internal Users Rights', default='none', required=True, index=True)
|
|
|
|
access_ids = fields.One2many('documents.access', 'document_id', string="Allowed Access")
|
|
|
|
user_permission = fields.Selection(
|
|
[('edit', 'Editor'), ('view', 'Viewer'), ('none', 'None')], string='User permission',
|
|
compute='_compute_user_permission', search='_search_user_permission', compute_sudo=True)
|
|
|
|
# Folder = parent document
|
|
parent_path = fields.Char(index=True) # see '_parent_store' implementation in the ORM for details
|
|
folder_id = fields.Many2one('documents.document', string="Parent Folder", ondelete="set null", tracking=True,
|
|
domain="[('type', '=', 'folder'), ('shortcut_document_id', '=', False)]",
|
|
required=False, index=True)
|
|
children_ids = fields.One2many('documents.document', 'folder_id')
|
|
|
|
deletion_delay = fields.Integer("Deletion delay", compute="_compute_deletion_delay",
|
|
help="Delay after permanent deletion of the document in the trash (days)")
|
|
company_id = fields.Many2one('res.company', string='Company', store=True, readonly=False, index=True)
|
|
|
|
# TODO: remove in master
|
|
is_pinned_folder = fields.Boolean("Pinned to Company roots", compute='_compute_is_pinned_folder', store=True)
|
|
|
|
# Stat buttons
|
|
document_count = fields.Integer('Document Count', compute='_compute_document_count')
|
|
|
|
# Activity
|
|
create_activity_option = fields.Boolean(string='Create a new activity')
|
|
create_activity_type_id = fields.Many2one('mail.activity.type', string="Activity type")
|
|
create_activity_summary = fields.Char('Summary')
|
|
create_activity_date_deadline_range = fields.Integer(string='Due Date In')
|
|
create_activity_date_deadline_range_type = fields.Selection([
|
|
('days', 'Days'),
|
|
('weeks', 'Weeks'),
|
|
('months', 'Months'),
|
|
], string='Due type', default='days')
|
|
create_activity_note = fields.Html(string="Note")
|
|
create_activity_user_id = fields.Many2one('res.users', string='Responsible')
|
|
|
|
# Actions that we can do on the document
|
|
available_embedded_actions_ids = fields.Many2many(
|
|
'ir.embedded.actions', 'Available Actions', compute='_compute_available_embedded_actions_ids',
|
|
groups='base.group_user')
|
|
|
|
# Alias
|
|
alias_tag_ids = fields.Many2many('documents.tag', 'document_alias_tag_rel', string="Alias Tags")
|
|
|
|
# UI fields
|
|
last_access_date_group = fields.Selection(selection=[
|
|
('0_older', 'Older'),
|
|
('1_month', 'This Month'),
|
|
('2_week', 'This Week'),
|
|
('3_day', 'Today'),
|
|
], string="Last Accessed On", compute='_compute_last_access_date_group', search='_search_last_access_date_group')
|
|
|
|
_sql_constraints = [
|
|
('attachment_unique', 'unique (attachment_id)', "This attachment is already a document"),
|
|
('document_token_unique', 'unique (document_token)', "Access tokens already used."),
|
|
('folder_id_not_id', 'check(folder_id <> id)', "A folder cannot be included in itself"),
|
|
('shortcut_document_id_not_id', 'check(shortcut_document_id <> id)', "A shortcut cannot point to itself"),
|
|
]
|
|
|
|
def init(self):
|
|
super().init()
|
|
create_index(self.env.cr,
|
|
indexname='documents_document_res_model_res_id_idx',
|
|
tablename=self._table,
|
|
expressions=['res_model', 'res_id'])
|
|
|
|
@api.depends('document_token')
|
|
def _compute_access_token(self):
|
|
for document in self:
|
|
document.access_token = f"{document.document_token}o{document.id or 0:x}"
|
|
|
|
@api.depends('access_token')
|
|
def _compute_access_url(self):
|
|
for document in self:
|
|
document.access_url = f'{document.sudo().get_base_url()}/odoo/documents/{document.access_token}'
|
|
|
|
@api.depends("folder_id", "company_id")
|
|
@api.depends_context("uid", "allowed_company_ids")
|
|
def _compute_display_name(self):
|
|
accessible_records = self._filtered_access('read')
|
|
not_accessible_records = self - accessible_records
|
|
not_accessible_records.display_name = _("Restricted")
|
|
folders = accessible_records.filtered(lambda d: d.type == 'folder')
|
|
for record in folders:
|
|
if record.user_permission != 'none':
|
|
record.display_name = record.name
|
|
else:
|
|
record.display_name = _("Restricted Folder")
|
|
|
|
for record in accessible_records - folders:
|
|
record.display_name = record.name
|
|
|
|
@api.depends('name', 'type')
|
|
def _compute_file_extension(self):
|
|
for record in self:
|
|
if record.type != 'binary':
|
|
record.file_extension = False
|
|
elif record.name:
|
|
record.file_extension = _sanitize_file_extension(get_extension(record.name.strip())) or False
|
|
|
|
def _inverse_file_extension(self):
|
|
for record in self:
|
|
record.file_extension = _sanitize_file_extension(record.file_extension) if record.file_extension else False
|
|
|
|
@api.constrains('shortcut_document_id', 'shortcut_ids', 'type', 'folder_id', 'children_ids', 'company_id')
|
|
def _check_shortcut_fields(self):
|
|
errors = []
|
|
wrong_types, wrong_companies = self.browse(), self.browse()
|
|
# Access rights can allow for a document to be edited without having access to its parent folder
|
|
wrong_parents_sudo = self.folder_id.sudo().filtered('shortcut_document_id')
|
|
for target in self.filtered('shortcut_ids'):
|
|
for shortcut in target.shortcut_ids:
|
|
if shortcut.type != target.type:
|
|
wrong_types |= shortcut
|
|
for shortcut in self.filtered('shortcut_document_id'):
|
|
if shortcut.type != shortcut.shortcut_document_id.type:
|
|
wrong_types |= shortcut
|
|
if shortcut.children_ids:
|
|
wrong_parents_sudo |= shortcut
|
|
if (shortcut.shortcut_document_id.company_id
|
|
and shortcut.shortcut_document_id.company_id != shortcut.company_id):
|
|
wrong_companies |= shortcut
|
|
if wrong_types:
|
|
message = _("The following documents/shortcuts have a type mismatch: \n")
|
|
documents_list = "\n- ".join(wrong_types.mapped('name'))
|
|
errors.append(f'{message}\n- {documents_list}')
|
|
if wrong_parents_sudo:
|
|
message = _("The following shortcuts cannot be set as documents parents: \n")
|
|
shortcuts_list = "\n- ".join(wrong_parents_sudo.mapped('name'))
|
|
errors.append(f'{message}\n- {shortcuts_list}')
|
|
if wrong_companies:
|
|
message = _("The following documents/shortcuts have a company mismatch: \n")
|
|
shortcuts_list = "\n- ".join(wrong_companies.mapped('name'))
|
|
errors.append(f'{message}\n- {shortcuts_list}')
|
|
if errors:
|
|
raise ValidationError('\n\n'.join(errors))
|
|
|
|
@api.constrains('owner_id', 'folder_id')
|
|
def _check_portal_cant_own_root_document(self):
|
|
wrong_records = self.filtered(lambda d: d.owner_id.sudo().share and not self.folder_id)
|
|
if wrong_records:
|
|
raise ValidationError(_(
|
|
"The following documents/folders can't be owned by a Portal User: \n- %(partners)s",
|
|
partners="\n-".join(wrong_records.mapped('name'))))
|
|
|
|
@api.constrains('type', 'alias_name')
|
|
def _check_alias(self):
|
|
wrong_records = self.filtered(lambda d: (d.type != 'folder' or d.shortcut_document_id) and d.alias_name)
|
|
if wrong_records:
|
|
raise ValidationError(_(
|
|
"The following documents can't have alias: \n- %(records)s",
|
|
records="\n-".join(wrong_records.mapped('name'))))
|
|
|
|
@api.depends('folder_id', 'owner_id', 'type')
|
|
def _compute_is_pinned_folder(self):
|
|
# TODO: remove in master, for stable force the field to reflect owner / folder value
|
|
# because it's used in access rule `documents_document_write_base_rule`
|
|
for document in self:
|
|
document.is_pinned_folder = (
|
|
document.type == 'folder'
|
|
and not document.folder_id
|
|
and document.owner_id == self.env.ref('base.user_root')
|
|
)
|
|
|
|
@api.depends('attachment_id', 'url', 'shortcut_document_id')
|
|
def _compute_name_and_preview(self):
|
|
request_session = requests.Session()
|
|
shortcuts = self.filtered('shortcut_document_id')
|
|
for record in self - shortcuts:
|
|
if record.attachment_id:
|
|
record.name = record.attachment_id.name
|
|
record.url_preview_image = False
|
|
elif record.url:
|
|
preview = link_preview.get_link_preview_from_url(record.url, request_session)
|
|
if not preview:
|
|
continue
|
|
if preview.get('og_title'):
|
|
record.name = preview['og_title']
|
|
if preview.get('og_image'):
|
|
record.url_preview_image = preview['og_image']
|
|
|
|
for shortcut in shortcuts:
|
|
shortcut.name = shortcut.name or shortcut.shortcut_document_id.name
|
|
shortcut.url_preview_image = shortcut.url_preview_image or shortcut.shortcut_document_id.url_preview_image
|
|
|
|
@api.depends_context('uid', 'allowed_company_ids')
|
|
@api.depends('access_ids', 'access_internal', 'access_via_link', 'owner_id', 'is_access_via_link_hidden',
|
|
'company_id', 'folder_id.access_ids', 'folder_id.access_internal', 'folder_id.access_via_link',
|
|
'folder_id.owner_id', 'folder_id.company_id')
|
|
def _compute_user_permission(self):
|
|
for document in self:
|
|
if self.env.user.has_group('documents.group_documents_system'):
|
|
document.user_permission = (
|
|
'edit' if not (company := document.company_id)
|
|
or company in self.env.companies or company not in self.env.user.company_ids
|
|
else 'none')
|
|
continue
|
|
document.user_permission = document._get_permission_without_token()
|
|
if document.user_permission == 'view' and document.access_via_link == 'edit':
|
|
document.user_permission = 'edit'
|
|
elif document.user_permission == 'none' and document.folder_id and document.access_via_link != 'none' \
|
|
and not document.is_access_via_link_hidden:
|
|
# If the user can access the parent, they have the link.
|
|
# This only works one level up, as it mimics accessing through the interface.
|
|
with contextlib.suppress(AccessError):
|
|
if document.folder_id._get_permission_without_token() != 'none':
|
|
document.user_permission = document.access_via_link
|
|
|
|
def _get_permission_without_token(self):
|
|
self.ensure_one()
|
|
is_user_company = self.company_id and self.company_id in self.env.user.company_ids
|
|
is_disabled_company = is_user_company and self.company_id not in self.env.companies
|
|
if is_disabled_company:
|
|
return 'none'
|
|
|
|
# own documents
|
|
if self.owner_id == self.env.user:
|
|
return 'edit'
|
|
|
|
user_permission = 'none'
|
|
# access with <documents.access>
|
|
if access := self.access_ids.filtered(
|
|
lambda a: a.partner_id == self.env.user.partner_id
|
|
and (not a.expiration_date or a.expiration_date > fields.Datetime.now())
|
|
):
|
|
user_permission = access.role or self.access_via_link
|
|
|
|
# access as internal
|
|
if not self.env.user.share and user_permission != "edit" and self.access_internal != 'none':
|
|
if not self.company_id or self.company_id in self.env.companies:
|
|
user_permission = (
|
|
'edit' if self.env.user.has_group('documents.group_documents_manager')
|
|
else self.access_internal
|
|
)
|
|
|
|
return user_permission
|
|
|
|
def _search_user_permission(self, operator, value):
|
|
if operator not in ('=', '!='):
|
|
raise NotImplementedError("Unsupported search operator")
|
|
if self.env.user._is_public() or value not in {'view', 'edit', 'none'}:
|
|
return expression.FALSE_DOMAIN
|
|
|
|
if (operator, value) in (("=", "edit"), ("!=", "view")): # access but no view => edit
|
|
searched_roles = ['edit']
|
|
elif (operator, value) in (('=', 'view'), ("!=", "edit")): # access without edit => view
|
|
searched_roles = ['view']
|
|
elif (operator, value) == ("!=", "none"): # any access
|
|
searched_roles = ['edit', 'view']
|
|
else:
|
|
return expression.FALSE_DOMAIN # ("=", "none") = not allowed, so no records
|
|
|
|
other_company = [('company_id', '!=', False), ('company_id', 'not in', self.env.user.company_ids.ids)]
|
|
allowed_or_no_company = [('company_id', 'in', [False] + self.env.companies.ids)]
|
|
any_except_disabled_company = expression.OR([
|
|
[('company_id', 'in', self.env.companies.ids)], [('company_id', 'not in', self.env.user.company_ids.ids)]
|
|
])
|
|
|
|
if self.env.user.has_group('documents.group_documents_system'):
|
|
if searched_roles == ['view']:
|
|
return expression.FALSE_DOMAIN # System Administrator has "edit" on all documents, so finds none with "view" only.
|
|
return any_except_disabled_company
|
|
|
|
# Access from membership
|
|
if searched_roles == ['view']:
|
|
access_level_domain = expression.OR([
|
|
[('role', '=', 'view'), ('document_id.access_via_link', 'in', ('none', 'view'))],
|
|
[('role', '=', False), ('document_id.access_via_link', '=', 'view')],
|
|
])
|
|
elif searched_roles == ['edit']:
|
|
access_level_domain = expression.OR([
|
|
[('role', '=', 'edit')], [('document_id.access_via_link', '=', 'edit')]
|
|
])
|
|
else:
|
|
access_level_domain = expression.OR([
|
|
[('role', 'in', ('view', 'edit'))], [('document_id.access_via_link', '!=', 'none')]
|
|
])
|
|
access_domain = [('access_ids', 'any', expression.AND([
|
|
access_level_domain,
|
|
expression.AND([
|
|
[('partner_id', '=', self.env.user.partner_id.id)],
|
|
['|', ('expiration_date', '=', False), ('expiration_date', '>', fields.Datetime.now())],
|
|
]),
|
|
]))]
|
|
|
|
# Access from ownership
|
|
owner_domain = [('owner_id', '=', self.env.user.id)]
|
|
direct_domain = expression.AND([
|
|
any_except_disabled_company,
|
|
access_domain if 'edit' not in searched_roles else expression.OR([access_domain, owner_domain]),
|
|
])
|
|
|
|
# Access form access_internal
|
|
if self.env.user.has_group('documents.group_documents_manager'):
|
|
if searched_roles == ['view']:
|
|
direct_domain = expression.AND([
|
|
direct_domain,
|
|
expression.OR([[('access_internal', '=', 'none')], other_company])
|
|
])
|
|
else:
|
|
direct_domain = expression.OR([
|
|
direct_domain,
|
|
expression.AND([[('access_internal', 'in', ('view', 'edit'))], allowed_or_no_company]),
|
|
])
|
|
elif not self.env.user.share:
|
|
if searched_roles == ['view']:
|
|
internal_domain = [('access_internal', '=', 'view'), ('access_via_link', 'in', ('none', 'view'))]
|
|
elif searched_roles == ['edit']:
|
|
internal_domain = expression.OR([
|
|
[('access_internal', '=', 'edit')],
|
|
expression.AND([[('access_internal', '=', 'view')], [('access_via_link', '=', 'edit')]]),
|
|
])
|
|
else:
|
|
internal_domain = [('access_internal', 'in', ('view', 'edit'))]
|
|
direct_domain = expression.OR([direct_domain, expression.AND([internal_domain, allowed_or_no_company])])
|
|
|
|
# Look one level up for links unless hidden
|
|
link_via_parent_domain = expression.AND([
|
|
[('access_via_link', 'in', searched_roles)],
|
|
[('is_access_via_link_hidden', '=', False)],
|
|
[('folder_id', 'any', direct_domain)],
|
|
])
|
|
|
|
return expression.OR([direct_domain, link_via_parent_domain])
|
|
|
|
@api.depends('datas', 'mimetype')
|
|
def _compute_is_multipage(self):
|
|
for document in self:
|
|
# external computation to be extended
|
|
document.is_multipage = bool(document._get_is_multipage()) # None => False
|
|
|
|
@api.depends('attachment_id', 'attachment_id.res_model', 'attachment_id.res_id',
|
|
'shortcut_document_id.res_model', 'shortcut_document_id.res_id')
|
|
def _compute_res_record(self):
|
|
for record in self:
|
|
attachment = record.attachment_id
|
|
if attachment:
|
|
record.res_model = attachment.res_model
|
|
record.res_id = attachment.res_id
|
|
if record.shortcut_document_id:
|
|
record.res_model = record.shortcut_document_id.res_model
|
|
record.res_id = record.shortcut_document_id.res_id
|
|
|
|
@api.depends('attachment_id', 'res_model', 'res_id')
|
|
def _compute_res_name(self):
|
|
for record in self:
|
|
if record.attachment_id:
|
|
record.res_name = record.attachment_id.res_name
|
|
elif record.res_id and record.res_model:
|
|
record.res_name = self.env[record.res_model].browse(record.res_id).display_name
|
|
else:
|
|
record.res_name = False
|
|
|
|
def _inverse_res_model(self):
|
|
for record in self:
|
|
attachment = record.attachment_id.with_context(no_document=True)
|
|
if attachment:
|
|
# Avoid inconsistency in the data, write both at the same time.
|
|
# In case a check_access is done between res_id and res_model modification,
|
|
# an access error can be received. (Mail causes this check_access)
|
|
attachment.sudo().write({'res_model': record.res_model, 'res_id': record.res_id})
|
|
|
|
@api.depends('checksum', 'shortcut_document_id.thumbnail', 'shortcut_document_id.thumbnail_status',
|
|
'shortcut_document_id.user_permission')
|
|
def _compute_thumbnail(self):
|
|
for document in self:
|
|
if document.shortcut_document_id:
|
|
if document.shortcut_document_id.user_permission != 'none':
|
|
document.thumbnail = document.shortcut_document_id.thumbnail
|
|
document.thumbnail_status = document.shortcut_document_id.thumbnail_status
|
|
else:
|
|
document.thumbnail = False
|
|
document.thumbnail_status = 'restricted'
|
|
elif document.mimetype and document.mimetype.startswith('application/pdf'):
|
|
# Thumbnails of pdfs are generated by the client. To force the generation, we invalidate the thumbnail.
|
|
document.thumbnail = False
|
|
document.thumbnail_status = 'client_generated'
|
|
elif document.mimetype and document.mimetype.startswith('image/'):
|
|
try:
|
|
document.thumbnail = base64.b64encode(image_process(document.raw, size=(200, 140), crop='center'))
|
|
document.thumbnail_status = 'present'
|
|
except (UserError, TypeError):
|
|
document.thumbnail = False
|
|
document.thumbnail_status = 'error'
|
|
else:
|
|
document.thumbnail = False
|
|
document.thumbnail_status = False
|
|
|
|
@api.depends('type')
|
|
def _compute_deletion_delay(self):
|
|
folders = self.filtered(lambda d: d.type == 'folder')
|
|
folders.deletion_delay = self.get_deletion_delay()
|
|
(self - folders).deletion_delay = False
|
|
|
|
@api.depends_context('uid')
|
|
@api.depends('type', 'children_ids', 'shortcut_document_id')
|
|
def _compute_document_count(self):
|
|
folders = (self | self.shortcut_document_id).filtered(
|
|
lambda d: d.type == 'folder' and not d.shortcut_document_id)
|
|
|
|
children_counts = Counter(dict(self._read_group(
|
|
[('folder_id', 'in', folders.ids)],
|
|
groupby=['folder_id'],
|
|
aggregates=['__count'])))
|
|
|
|
for doc in self:
|
|
doc.document_count = children_counts[doc.shortcut_document_id or doc]
|
|
|
|
def _get_folder_embedded_actions(self, folder_ids):
|
|
"""Return the enabled actions for the given folder."""
|
|
embedded_actions = self.env['ir.embedded.actions'].sudo().search(
|
|
domain=[
|
|
('parent_action_id', '=', self.env.ref("documents.document_action").id),
|
|
('action_id', '!=', False),
|
|
('action_id.type', '=', 'ir.actions.server'),
|
|
('parent_res_model', '=', 'documents.document'),
|
|
('parent_res_id', 'in', folder_ids),
|
|
('groups_ids', 'in', [False] + self.env.user.groups_id.ids),
|
|
],
|
|
order='sequence',
|
|
)
|
|
# group after ordering by `ir.embedded.actions` sequence
|
|
return embedded_actions.grouped('parent_res_id')
|
|
|
|
@api.depends_context('uid')
|
|
@api.depends('folder_id')
|
|
def _compute_available_embedded_actions_ids(self):
|
|
embedded_actions = self._get_folder_embedded_actions(self.folder_id.ids)
|
|
embedded_actions_per_folder = {
|
|
folder_id: actions.ids
|
|
for folder_id, actions in embedded_actions.items()
|
|
}
|
|
self.available_embedded_actions_ids = False
|
|
for document in self.filtered(lambda d: d.type != 'folder' and not d.shortcut_document_id):
|
|
document.available_embedded_actions_ids = embedded_actions_per_folder.get(document.folder_id.id, False)
|
|
|
|
def _get_last_access_date_group_cte(self):
|
|
return SQL("""
|
|
WITH last_access_date AS (
|
|
SELECT (CASE
|
|
WHEN last_access_date > NOW() - INTERVAL '1 days' THEN '3_day'
|
|
WHEN last_access_date > NOW() - INTERVAL '7 days' THEN '2_week'
|
|
WHEN last_access_date > NOW() - INTERVAL '1 months' THEN '1_month'
|
|
ELSE '0_older'
|
|
END) AS date,
|
|
document_id
|
|
FROM documents_access
|
|
WHERE partner_id = %s
|
|
)
|
|
""", self.env.user.partner_id.id)
|
|
|
|
@api.depends('access_ids')
|
|
def _compute_last_access_date_group(self):
|
|
self.env.cr.execute(SQL(
|
|
"""(%s SELECT document_id, date FROM last_access_date WHERE document_id = ANY(%s))""",
|
|
self._get_last_access_date_group_cte(),
|
|
self.ids,
|
|
))
|
|
values = {line['document_id']: line['date'] for line in self.env.cr.dictfetchall()}
|
|
for document in self:
|
|
document.last_access_date_group = values.get(document.id)
|
|
|
|
def _search_last_access_date_group(self, operator, operand):
|
|
if operator != '=':
|
|
raise NotImplementedError("Unsupported search operator or value")
|
|
query = SQL(
|
|
"""(%s SELECT document_id FROM last_access_date WHERE date = %s)""",
|
|
self._get_last_access_date_group_cte(), operand)
|
|
return [('id', 'in', query)]
|
|
|
|
def _field_to_sql(self, alias, fname, query=None, flush: bool = True) -> SQL:
|
|
if fname == 'last_access_date_group':
|
|
# Allow to group on the field
|
|
return SQL("""(%s SELECT date FROM last_access_date WHERE document_id = %s)""",
|
|
self._get_last_access_date_group_cte(), SQL.identifier(alias, 'id'))
|
|
|
|
return super()._field_to_sql(alias, fname, query, flush)
|
|
|
|
def _order_field_to_sql(self, alias, field_name, direction, nulls, query):
|
|
if field_name == 'last_access_date_group':
|
|
sql_field = SQL(
|
|
"SELECT last_access_date FROM documents_access WHERE partner_id = %s AND document_id = %s",
|
|
self.env.user.partner_id.id,
|
|
SQL.identifier(alias, 'id')
|
|
)
|
|
return SQL("(%s) %s %s", sql_field, direction, nulls)
|
|
|
|
if field_name == 'is_folder':
|
|
sql_field = SQL("%s != 'folder'", SQL.identifier(alias, 'type'))
|
|
return SQL("(%s) %s %s", sql_field, direction, nulls)
|
|
|
|
return super()._order_field_to_sql(alias, field_name, direction, nulls, query)
|
|
|
|
@api.model
|
|
def get_previewable_file_extensions(self):
|
|
return {'bmp', 'mp4', 'mp3', 'png', 'jpg', 'jpeg', 'pdf', 'gif', 'txt', 'wav'}
|
|
|
|
def action_move_documents(self, folder_id):
|
|
"""Move document to new parent folder or none (my drive or company)
|
|
|
|
:param int|bool folder_id: new parent folder id
|
|
"""
|
|
target_folder = self.browse(folder_id)
|
|
if target_folder.shortcut_document_id:
|
|
return self.action_move_documents(target_folder.shortcut_document_id.id)
|
|
|
|
documents_to_move = self.filtered(lambda d: d.folder_id != target_folder and d != target_folder)
|
|
if not documents_to_move:
|
|
return
|
|
|
|
try:
|
|
(documents_to_move | target_folder).check_access('write')
|
|
except UserError:
|
|
raise AccessError(_("You are not allowed to perform this operation."))
|
|
if target_folder and (cyclic_move := documents_to_move.filtered(
|
|
lambda d: target_folder.parent_path.startswith(d.parent_path))):
|
|
raise UserError(_(
|
|
"Impossible to move the following items as this would create a recursive hierarchy:\n"
|
|
"- %(documents)s", documents='\n- '.join(cyclic_move.mapped('name'))
|
|
))
|
|
documents_to_move.folder_id = target_folder
|
|
|
|
if target_folder and (documents_to_sync := documents_to_move.filtered(lambda d: not d.shortcut_document_id)):
|
|
documents_to_sync.sudo().action_update_access_rights(
|
|
access_internal=target_folder.access_internal,
|
|
access_via_link=target_folder.access_via_link,
|
|
is_access_via_link_hidden=target_folder.is_access_via_link_hidden,
|
|
# Simply add partners of destination
|
|
partners={access.partner_id: (access.role, access.expiration_date)
|
|
for access in target_folder.access_ids},
|
|
)
|
|
|
|
def action_change_owner(self, new_user_id):
|
|
if not self.env.user._is_admin() and not self.env.user.has_group('documents.group_documents_manager'):
|
|
if any(document.owner_id != self.env.user for document in self):
|
|
raise AccessError(_("You are not allowed to change ownerships of documents you do not own."))
|
|
self.owner_id = new_user_id
|
|
|
|
def action_create_shortcut(self, location_folder_id=None):
|
|
"""Create a shortcut to self in a specific folder or as sibling
|
|
|
|
:param int | None location_folder_id: Optional: where to create the shortcut.
|
|
"""
|
|
if not self.ids:
|
|
return
|
|
|
|
if len(self.folder_id.ids) > 1 and location_folder_id is None:
|
|
raise UserError(_("A destination is required when creating multiple shortcuts at once."))
|
|
if self.shortcut_document_id:
|
|
targets = self.filtered(lambda d: not d.shortcut_document_id) | self.shortcut_document_id
|
|
return targets.action_create_shortcut(location_folder_id or self.folder_id.id)
|
|
|
|
location = self.browse(location_folder_id) if location_folder_id is not None else self.folder_id
|
|
if location_folder_id and location.shortcut_document_id:
|
|
return self.action_create_shortcut(location.shortcut_document_id.id)
|
|
|
|
if location:
|
|
try:
|
|
location.check_access('write')
|
|
except UserError as exc:
|
|
if self.env.user.share:
|
|
raise AccessError(_("You are not allowed to write in this folder.")) from exc
|
|
location = self.browse() # My Drive
|
|
self.env.user._bus_send('simple_notification', {
|
|
'type': 'info',
|
|
'message': _("Shortcut created in My Drive"),
|
|
})
|
|
if location.shortcut_document_id:
|
|
raise AccessError(_("Shortcuts cannot contain documents."))
|
|
self.check_access('read')
|
|
|
|
return self.sudo().create([{
|
|
"folder_id": location.id,
|
|
"shortcut_document_id": document.id,
|
|
"name": document.name,
|
|
"type": document.type,
|
|
"access_internal": document.access_internal or 'view',
|
|
"access_via_link": document.access_via_link or 'none',
|
|
"company_id": document.company_id.id,
|
|
"access_ids": [
|
|
Command.create({
|
|
"partner_id": access.partner_id.id,
|
|
"role": access.role,
|
|
})
|
|
for access in document.access_ids if access.role
|
|
],
|
|
"is_multipage": document.is_multipage,
|
|
"is_access_via_link_hidden": document.is_access_via_link_hidden,
|
|
} for document in self]).sudo(False)
|
|
|
|
def action_update_access_rights(self, access_internal=None, access_via_link=None, is_access_via_link_hidden=None,
|
|
partners=None, notify=False, message=""):
|
|
"""Update access to a document and propagate if applicable.
|
|
|
|
This method can be called to update the access of internal users, with
|
|
the link, as well as a set of partners and roles to the records in self
|
|
and their children (except shortcuts), and shortcuts pointing to them
|
|
as they are kept synchronized.
|
|
|
|
Modifications to internal users and link access are propagated down to
|
|
children until the new value is already present.
|
|
For partners, all changes are applied to all children regardless of the
|
|
existing rights structure.
|
|
|
|
:param str | None access_internal: optional new permission level for internal users
|
|
:param str | None access_via_link: optional new permission level for partners with the link
|
|
:param bool|None is_access_via_link_hidden: optional new value for discoverability
|
|
:param dict[str | int | res.partner(), tuple[str | bool | None, str | datetime | bool | None] partners:
|
|
Mapping of partner(_id) to the tuple:
|
|
role: 'edit', 'view', False (=>delete),
|
|
expiration: datetime string, False (removed/None)
|
|
:param bool notify: whether to send an email
|
|
:param str message: message to add to the email
|
|
"""
|
|
if len(self.ids) == 0:
|
|
return
|
|
try:
|
|
self.check_access('write')
|
|
except UserError:
|
|
raise AccessError("You are not allowed to update these access rights.")
|
|
|
|
if len(self.ids) > 1 and notify:
|
|
raise UserError(_("Impossible to invite partners on multiple documents at once."))
|
|
|
|
if self.shortcut_document_id:
|
|
raise UserError(_("You can not update the access of a shortcut, update its target instead."))
|
|
|
|
# Check inputs as we are going to bypass the ORM in the private method(s)
|
|
access_options = {'view', 'edit', 'none', None}
|
|
hidden_options = {None, True, False}
|
|
role_options = {'edit', 'view', False, None}
|
|
incorrect_fields_to_options = {
|
|
**({'is_access_via_link_hidden': hidden_options} if is_access_via_link_hidden not in hidden_options else {}),
|
|
**({'access_via_link': access_options} if access_via_link not in access_options else {}),
|
|
**({'access_internal': access_options} if access_internal not in access_options else {}),
|
|
**({'partners.role': role_options}
|
|
if any(role not in role_options for (__, (role, __)) in (partners or {}).items()) else {})
|
|
}
|
|
if incorrect_fields_to_options:
|
|
hints = "\n- " + "\n- ".join(f'{name}: {options}' for name, options in incorrect_fields_to_options.items())
|
|
raise UserError(_(
|
|
"Incorrect values. Use one of the following for the following fields: %(hints)s.)", hints=hints
|
|
))
|
|
|
|
self._action_update_access(access_internal, access_via_link, is_access_via_link_hidden)
|
|
if partners:
|
|
partners = {
|
|
self.env['res.partner'].browse(int(partner)) if isinstance(partner, str | int) else partner:
|
|
(role, fields.Datetime.to_datetime(exp) if exp and isinstance(exp, str) else exp)
|
|
for partner, (role, exp) in (partners or {}).items()
|
|
}
|
|
root_access_partners = self.access_ids.partner_id
|
|
self._action_update_members(partners)
|
|
if notify:
|
|
self._send_access_by_mail(
|
|
{p: role for p, (role, __) in partners.items() if role and p not in root_access_partners},
|
|
message=message
|
|
)
|
|
|
|
return self.mapped('user_permission')
|
|
|
|
def _action_update_access(self, access_internal, access_via_link, is_access_via_link_hidden):
|
|
"""Update the access on self and children.
|
|
|
|
Stop the propagation when the value is already the right one.
|
|
|
|
:param str | None access_internal: change the `access_internal` if not None
|
|
:param str | None access_via_link: change the `access_via_link` if not None
|
|
:param bool | None is_access_via_link_hidden: change the `is_access_via_link_hidden` if not None
|
|
"""
|
|
self.flush_model()
|
|
shortcuts_to_check_owner_target_access = self.browse()
|
|
for field, value in (
|
|
('access_internal', access_internal),
|
|
('access_via_link', access_via_link),
|
|
('is_access_via_link_hidden', is_access_via_link_hidden),
|
|
):
|
|
if value is None:
|
|
continue
|
|
|
|
# records that we might need to update
|
|
candidates_domain = [
|
|
(field, '!=', value),
|
|
*([] if self.env.su else [('user_permission', '=', 'edit')]),
|
|
# the update is done only "target -> shortcut",
|
|
# but not "shortcut -> target"
|
|
('shortcut_document_id', '=', False),
|
|
('id', 'child_of', self.ids),
|
|
]
|
|
candidates = self.env['documents.document']._search(
|
|
candidates_domain).select('id', 'folder_id', 'shortcut_document_id', field)
|
|
shortcuts_to_check_owner_target_access |= self.search([('shortcut_document_id', 'any', candidates_domain)])
|
|
|
|
self.env.cr.execute(SQL("""
|
|
WITH RECURSIVE candidates AS (%(candidates)s),
|
|
-- explore the folders
|
|
documents_to_update AS (
|
|
SELECT id
|
|
FROM candidates
|
|
WHERE id = ANY(%(root_ids)s)
|
|
UNION
|
|
SELECT child.id
|
|
FROM candidates AS child
|
|
JOIN documents_to_update AS parent
|
|
ON child.folder_id = parent.id
|
|
),
|
|
documents_and_shortcuts AS (
|
|
SELECT id FROM documents_to_update
|
|
UNION
|
|
-- document.shortcut_ids
|
|
-- update in "SUDO" to keep them synchronized
|
|
SELECT shortcut.id
|
|
FROM documents_document AS shortcut
|
|
JOIN documents_to_update
|
|
ON documents_to_update.id = shortcut.shortcut_document_id
|
|
)
|
|
UPDATE documents_document
|
|
SET %(field)s = %(value)s
|
|
FROM documents_and_shortcuts AS doc
|
|
-- document | document.children_ids | document.shortcut_ids
|
|
WHERE documents_document.id = doc.id
|
|
""", field=SQL(field), value=value, root_ids=self.ids, candidates=candidates))
|
|
|
|
self.invalidate_model([
|
|
'access_internal',
|
|
'access_via_link',
|
|
'is_access_via_link_hidden',
|
|
'user_permission',
|
|
])
|
|
|
|
shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible()
|
|
|
|
def _action_update_members(self, partners):
|
|
"""Update the members access on all files bellow the current folder.
|
|
|
|
:param partners: Partners to add as members / change access
|
|
"""
|
|
self.env['documents.access'].flush_model()
|
|
|
|
partners_to_remove = self.env['res.partner']
|
|
# {(role, expiration_date): partners}
|
|
values_to_update = defaultdict(lambda: self.env['res.partner'])
|
|
|
|
for partner, (role, expiration_date) in partners.items():
|
|
if role is False:
|
|
# remove the members
|
|
partners_to_remove |= partner
|
|
elif role is not None or expiration_date is not None:
|
|
values_to_update[role, expiration_date] |= partner
|
|
|
|
# use `_search` to respect access rules and to use `_search_user_permission`
|
|
to_update_domain = [
|
|
*([] if self.env.su else [('user_permission', '=', 'edit')]),
|
|
('shortcut_document_id', '=', False), # update "target -> shortcuts" but not "shortcut -> target"
|
|
('id', 'child_of', self.ids),
|
|
]
|
|
documents = self.env['documents.document']._search(to_update_domain).select('id')
|
|
|
|
for (role, expiration_date), partners in values_to_update.items():
|
|
update_fields = []
|
|
if role not in ('edit', 'view'):
|
|
raise UserError(_("Invalid role.")) # The public method would have returned a more insightful message
|
|
else:
|
|
update_fields.append(SQL('role = %(role)s', role=role))
|
|
if expiration_date is not None:
|
|
update_fields.append(SQL(
|
|
'expiration_date = %(expiration_date)s',
|
|
expiration_date=expiration_date or None,
|
|
))
|
|
update_fields = SQL(',').join(update_fields)
|
|
|
|
self.env.cr.execute(SQL(
|
|
"""
|
|
WITH documents AS (%(documents)s),
|
|
documents_and_shortcuts AS (
|
|
SELECT * FROM documents
|
|
UNION
|
|
-- document.shortcut_ids
|
|
SELECT shortcut.id
|
|
FROM documents_document AS shortcut
|
|
JOIN documents AS document
|
|
ON document.id = shortcut.shortcut_document_id
|
|
)
|
|
INSERT INTO documents_access (
|
|
document_id,
|
|
partner_id,
|
|
role,
|
|
expiration_date
|
|
) (
|
|
SELECT DISTINCT ON (doc.id, partner_id) doc.id,
|
|
partner_id,
|
|
%(role)s,
|
|
%(expiration_date)s
|
|
FROM documents_and_shortcuts AS doc
|
|
JOIN LATERAL UNNEST(%(partner_ids)s) AS partner_id
|
|
ON 1=1
|
|
)
|
|
ON CONFLICT (document_id, partner_id) DO UPDATE SET
|
|
%(update_fields)s
|
|
""",
|
|
documents=documents,
|
|
partner_ids=partners.ids,
|
|
expiration_date=expiration_date or None,
|
|
role=role,
|
|
update_fields=update_fields,
|
|
))
|
|
shortcuts_to_check_owner_target_access = self.browse()
|
|
if partners_to_remove:
|
|
shortcuts_to_check_owner_target_access = self.search(expression.AND([
|
|
[('shortcut_document_id', 'any', to_update_domain)],
|
|
[('owner_id.partner_id', 'in', partners_to_remove.ids)],
|
|
]))
|
|
|
|
self.env.cr.execute(SQL("""
|
|
WITH documents AS (%(documents)s),
|
|
docs_and_shortcuts AS (
|
|
SELECT id
|
|
FROM documents
|
|
UNION
|
|
SELECT shortcut.id
|
|
FROM documents AS doc
|
|
JOIN documents_document AS shortcut
|
|
ON doc.id = shortcut.shortcut_document_id
|
|
)
|
|
DELETE FROM documents_access AS access
|
|
USING docs_and_shortcuts AS doc
|
|
WHERE access.document_id = doc.id
|
|
AND access.partner_id = ANY(%(partner_ids)s)
|
|
""", documents=documents, partner_ids=partners_to_remove.ids))
|
|
|
|
self.env['documents.document'].invalidate_model([
|
|
'access_ids',
|
|
'user_permission',
|
|
])
|
|
|
|
shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible()
|
|
|
|
def action_see_documents(self):
|
|
if self.type != "folder":
|
|
raise UserError(_("Not a folder."))
|
|
|
|
domain = [('folder_id', '=', self.id)]
|
|
return {
|
|
'name': _('Documents'),
|
|
'domain': domain,
|
|
'res_model': 'documents.document',
|
|
'type': 'ir.actions.act_window',
|
|
'views': [(False, 'list'), (False, 'form')],
|
|
'view_mode': 'list,form',
|
|
'context': {'searchpanel_default_folder_id': self.id}
|
|
}
|
|
|
|
def toggle_is_pinned_folder(self):
|
|
# TODO: remove in master
|
|
self.ensure_one()
|
|
|
|
@api.model
|
|
def get_documents_actions(self, folder_id):
|
|
"""Return the available actions and a key to know if the action is embedded on the folder."""
|
|
folder = self.env['documents.document'].browse(folder_id).exists()
|
|
if not folder:
|
|
raise UserError(_('This folder does not exist.'))
|
|
|
|
embedded_actions = self._get_folder_embedded_actions(folder.ids)
|
|
embedded_actions = embedded_actions[folder.id].action_id.ids if embedded_actions else []
|
|
|
|
actions = self.env['ir.actions.server'].sudo().search([
|
|
('model_id', '=', self.env['ir.model']._get_id('documents.document')),
|
|
('usage', '=', 'ir_actions_server'),
|
|
])
|
|
# Do not show an action if it's a child of a different action
|
|
actions -= actions.child_ids
|
|
return [{
|
|
"id": action.id,
|
|
"name": action.display_name,
|
|
"is_embedded": action.id in embedded_actions
|
|
} for action in actions]
|
|
|
|
@api.model
|
|
def action_folder_embed_action(self, folder_id, action_id, groups_ids=None):
|
|
"""Enable / disable the action for the given folder
|
|
|
|
:param int folder_id: The folder on which we pin the actions
|
|
:param int action_id: The id of the action to enable
|
|
:param list[int] groups_ids: ids of the groups the action is available to
|
|
"""
|
|
if not self.env.user.has_group('documents.group_documents_user'):
|
|
raise AccessError(_("You are not allowed to pin/unpin embedded Actions."))
|
|
|
|
if not groups_ids:
|
|
groups_ids = self.env.ref('base.group_user').ids
|
|
|
|
action = self.env['ir.actions.server'].browse(action_id).sudo().exists()
|
|
if not action:
|
|
raise UserError(_('This action does not exist.'))
|
|
if action.type != 'ir.actions.server':
|
|
raise UserError(_('You can not ping that type of action.'))
|
|
folder = self.env['documents.document'].browse(folder_id).sudo().exists()
|
|
if not folder or folder.type != 'folder':
|
|
raise UserError(_('You can not ping an action on that document.'))
|
|
|
|
embedded = self.env['ir.embedded.actions'].sudo().search([
|
|
('parent_action_id', '=', self.env.ref("documents.document_action").id),
|
|
('action_id', '=', action_id),
|
|
('action_id.type', '=', 'ir.actions.server'),
|
|
('parent_res_model', '=', 'documents.document'),
|
|
('parent_res_id', '=', folder_id),
|
|
('groups_ids', 'in', groups_ids),
|
|
]).sudo(False)
|
|
if embedded:
|
|
embedded.unlink()
|
|
else:
|
|
# first pinned action should be displayed first
|
|
last_action = self.env['ir.embedded.actions'].search(
|
|
[], order='sequence DESC', limit=1)
|
|
self.env['ir.embedded.actions'].create({
|
|
'name': action.name,
|
|
'parent_action_id': self.env.ref("documents.document_action").id,
|
|
'action_id': action.id,
|
|
'parent_res_model': 'documents.document',
|
|
'parent_res_id': folder_id,
|
|
'groups_ids': groups_ids,
|
|
'sequence': last_action.sequence + 1 if last_action else 1,
|
|
})
|
|
|
|
return self.get_documents_actions(folder_id)
|
|
|
|
@api.model
|
|
def action_execute_embedded_action(self, action_id):
|
|
"""Execute an embedded action on context records.
|
|
|
|
:param int action_id: id of embedded action to be run on context provided records.
|
|
"""
|
|
if self.env.user.share:
|
|
raise AccessError(_("You are not allowed to execute embedded actions."))
|
|
if self.env.context.get('active_model') != 'documents.document':
|
|
raise UserError(_("Unavailable action."))
|
|
ids = self.env.context.get('active_ids', self.env.context.get('active_id'))
|
|
if not ids:
|
|
raise UserError(_("Missing documents reference."))
|
|
|
|
embedded_action = self.env['ir.embedded.actions'].browse([action_id])
|
|
if all(action_id in document.available_embedded_actions_ids.ids for document in self.browse(ids)):
|
|
return self.env['ir.actions.server'].browse(embedded_action.action_id.id).run()
|
|
|
|
raise UserError(_("Unavailable action."))
|
|
|
|
def action_link_to_record(self, model=False):
|
|
"""Open the `link_to_record_wizard` to choose a record to link to the current documents.
|
|
|
|
This method can be used inside server actions.
|
|
"""
|
|
context = {
|
|
'default_document_ids': self.ids,
|
|
'default_resource_ref': False,
|
|
'default_is_readonly_model': False,
|
|
'default_model_ref': False,
|
|
}
|
|
|
|
if documents_link_record := self.filtered(lambda d: d.res_model != 'documents.document'):
|
|
return {
|
|
'type': 'ir.actions.client',
|
|
'tag': 'display_notification',
|
|
'params': {
|
|
'type': 'warning',
|
|
'message': _(
|
|
"Already linked Documents: %s",
|
|
", ".join(documents_link_record.mapped('name'))
|
|
),
|
|
}
|
|
}
|
|
|
|
if model:
|
|
self.env[model].check_access('write')
|
|
context['default_is_readonly_model'] = True
|
|
context['default_model_id'] = self.env['ir.model']._get_id(model)
|
|
first_valid_id = self.env[model].search([], limit=1).id
|
|
context['default_resource_ref'] = f'{model},{first_valid_id}'
|
|
|
|
return {
|
|
'name': _('Choose a record to link'),
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': 'documents.link_to_record_wizard',
|
|
'view_mode': 'form',
|
|
'target': 'new',
|
|
'views': [(False, "form")],
|
|
'context': context,
|
|
}
|
|
|
|
def _send_access_by_mail(self, partners, message=""):
|
|
"""Send a notification email to contacts granted with a new document/folder access.
|
|
|
|
:param dict[res.partner(), str] partners: Mapping of partner to new_role: 'edit', 'view'.
|
|
:param message: message to add to the email
|
|
"""
|
|
self.ensure_one()
|
|
subject = _('%s shared with you', self.display_name) if self.display_name else _('Access to a folder or a document')
|
|
formatted_msg = Markup(message) if message else ""
|
|
roles_info = {
|
|
(role, lang): {
|
|
"body": self.env['ir.qweb'].with_context(lang=lang)._render(
|
|
'documents.mail_template_document_share',
|
|
{'record': self, 'user': self.env.user, 'message': formatted_msg}
|
|
),
|
|
"role_label": _('Editor') if role == 'edit' else _('Viewer'),
|
|
} for role, lang in {(role, partner.lang) for partner, role in partners.items()}
|
|
}
|
|
|
|
for partner, role in partners.items():
|
|
self.with_context(lang=partner.lang).message_notify(
|
|
body=roles_info[role, partner.lang]['body'],
|
|
email_layout_xmlid='mail.mail_notification_layout',
|
|
partner_ids=partner.ids,
|
|
subject=subject,
|
|
subtitles=[self.display_name, _('Your Role: %s', roles_info[role, partner.lang]['role_label'])],
|
|
)
|
|
|
|
def _notify_get_recipients_groups(self, message, model_description, msg_vals=None):
|
|
groups = super()._notify_get_recipients_groups(
|
|
message, model_description, msg_vals=msg_vals
|
|
)
|
|
if len(self.ids) != 1:
|
|
return groups
|
|
|
|
group_values = {
|
|
'active': True,
|
|
'button_access': {'url': self.access_url},
|
|
'has_button_access': True,
|
|
}
|
|
return [
|
|
('group_documents_document_people_with_access',
|
|
lambda pdata:
|
|
(pdata['uid'] and self.with_user(pdata['uid']).user_permission != 'none') or
|
|
(pdata['id'] and self.access_via_link != 'none'
|
|
and self.access_ids.filtered(lambda a: a.partner_id.id == pdata['id'] and a.role)),
|
|
group_values)
|
|
] + groups
|
|
|
|
def get_deletion_delay(self):
|
|
return int(self.env['ir.config_parameter'].sudo().get_param('documents.deletion_delay', '30'))
|
|
|
|
def _get_is_multipage(self):
|
|
"""Whether the document can be considered multipage, if able to determine.
|
|
|
|
:return: `None` if mimetype not handled, `False` if single page or error occurred, `True` otherwise.
|
|
:rtype: bool | None
|
|
"""
|
|
if self.mimetype not in ('application/pdf', 'application/pdf;base64'):
|
|
return None
|
|
stream = io.BytesIO(base64.b64decode(self.datas))
|
|
try:
|
|
return PdfFileReader(stream, strict=False).numPages > 1
|
|
except AttributeError:
|
|
raise # If PyPDF's API changes and the `numPages` property isn't there anymore, not if its computation fails.
|
|
except Exception: # noqa: BLE001
|
|
_logger.warning('Impossible to count pages in %r. It could be due to a malformed document or a '
|
|
'(possibly known) issue within PyPDF2.', self.name, exc_info=True)
|
|
return False
|
|
|
|
def _get_models(self, domain):
|
|
"""
|
|
Return the names of the models to which the attachments are attached.
|
|
|
|
:param domain: the domain of the _read_group on documents.
|
|
:return: a list of model data, the latter being a dict with the keys
|
|
'id' (technical name),
|
|
'name' (display name) and
|
|
'__count' (how many attachments with that domain).
|
|
"""
|
|
not_a_file = []
|
|
not_attached = []
|
|
models = []
|
|
groups = self._read_group(domain, ['res_model'], ['__count'])
|
|
for res_model, count in groups:
|
|
if not res_model:
|
|
not_a_file.append({
|
|
'id': res_model,
|
|
'display_name': _('Not a file'),
|
|
'__count': count,
|
|
})
|
|
elif res_model == 'documents.document':
|
|
not_attached.append({
|
|
'id': res_model,
|
|
'display_name': _('Not attached'),
|
|
'__count': count,
|
|
})
|
|
else:
|
|
models.append({
|
|
'id': res_model,
|
|
'display_name': self.env['ir.model']._get(res_model).display_name,
|
|
'__count': count,
|
|
})
|
|
return sorted(models, key=lambda m: m['display_name']) + not_attached + not_a_file
|
|
|
|
@api.depends('favorited_ids')
|
|
@api.depends_context('uid')
|
|
def _compute_is_favorited(self):
|
|
favorited = self._filtered_access('read').filtered(lambda d: self.env.user in d.favorited_ids)
|
|
favorited.is_favorited = True
|
|
(self - favorited).is_favorited = False
|
|
|
|
def _inverse_is_favorited(self):
|
|
unfavorited_documents = favorited_documents = self.env['documents.document'].sudo()
|
|
for document in self:
|
|
if self.env.user in document.favorited_ids:
|
|
unfavorited_documents |= document
|
|
else:
|
|
favorited_documents |= document
|
|
favorited_documents.write({'favorited_ids': [(4, self.env.uid)]})
|
|
unfavorited_documents.write({'favorited_ids': [(3, self.env.uid)]})
|
|
|
|
@api.depends('res_model')
|
|
def _compute_res_model_name(self):
|
|
for record in self:
|
|
if record.res_model:
|
|
record.res_model_name = self.env['ir.model']._get(record.res_model).display_name
|
|
else:
|
|
record.res_model_name = False
|
|
|
|
@api.constrains('url')
|
|
def _check_url(self):
|
|
for document in self.filtered("url"):
|
|
if not document.url.startswith(('https://', 'http://', 'ftp://')):
|
|
raise ValidationError(_('URL %s does not seem complete, as it does not begin with http(s):// or ftp://', document.url))
|
|
|
|
@api.model
|
|
def message_new(self, msg_dict, custom_values=None):
|
|
"""When an email comes, create a document with the default values,
|
|
then let `_message_post_after_hook` create one document per attachment."""
|
|
custom_values = custom_values or {}
|
|
|
|
folder = self.env['documents.document'].browse(custom_values.get('folder_id'))
|
|
|
|
custom_values['name'] = _('Mail: %s', msg_dict.get('subject'))
|
|
if 'tag_ids' not in custom_values:
|
|
custom_values['tag_ids'] = folder.alias_tag_ids.ids
|
|
|
|
else:
|
|
tags = custom_values['tag_ids']
|
|
if tags and isinstance(tags[0], list | tuple):
|
|
# we have a list of m2m commands
|
|
if all(len(t) >= 2 and t[0] == Command.LINK for t in tags):
|
|
tags = [t[1] for t in tags]
|
|
elif len(tags) == 1 and len(tags[0]) == 3 and tags[0][0] == Command.SET:
|
|
tags = tags[0][2]
|
|
else: # do not support other commands
|
|
tags = []
|
|
|
|
custom_values['tag_ids'] = self.env['documents.tag'].browse(tags).exists().ids
|
|
|
|
custom_values['active'] = False
|
|
return super().message_new(msg_dict, custom_values)
|
|
|
|
def _alias_get_creation_values(self):
|
|
values = super()._alias_get_creation_values()
|
|
values['alias_model_id'] = self.env['ir.model']._get('documents.document').id
|
|
if self.id:
|
|
values['alias_defaults'] = literal_eval(self.alias_defaults or "{}")
|
|
values['alias_defaults'] |= {'folder_id': self.id}
|
|
return values
|
|
|
|
def _message_post_after_hook(self, message, msg_vals):
|
|
""" If the res model was an attachment and a mail, adds all the custom values of the linked
|
|
document settings to the attachments of the mail.
|
|
"""
|
|
m2m_commands = msg_vals['attachment_ids']
|
|
attachments = self.env['ir.attachment'].browse([x[1] for x in m2m_commands])
|
|
if (not self.env.context.get("no_document") or message.message_type == 'email') and attachments:
|
|
self.attachment_id = False
|
|
documents = self.env['documents.document'].create([{
|
|
'name': attachment.name,
|
|
'attachment_id': attachment.id,
|
|
'folder_id': self.folder_id.id,
|
|
'owner_id': self.folder_id.owner_id.id or self.env.ref('base.user_root').id,
|
|
'partner_id': self.partner_id.id,
|
|
'tag_ids': self.tag_ids.ids,
|
|
} for attachment in attachments])
|
|
|
|
for attachment, document in zip(attachments, documents):
|
|
attachment.write({
|
|
'res_model': 'documents.document',
|
|
'res_id': document.id,
|
|
})
|
|
document.message_post(
|
|
message_type='email',
|
|
body=msg_vals.get('body', ''),
|
|
email_from=msg_vals.get('email_from'),
|
|
subject=msg_vals.get('subject') or self.name
|
|
)
|
|
# Activity settings set through alias_defaults values has precedence over the activity folder settings
|
|
if self.create_activity_option:
|
|
document.documents_set_activity(settings_record=self)
|
|
elif self.folder_id.create_activity_option:
|
|
document.documents_set_activity(settings_record=self.folder_id)
|
|
|
|
return super()._message_post_after_hook(message, msg_vals)
|
|
|
|
def documents_set_activity(self, settings_record=None):
|
|
"""
|
|
Generate an activity based on the fields of settings_record.
|
|
|
|
:param settings_record: the record that contains the activity fields.
|
|
settings_record.create_activity_type_id (required)
|
|
settings_record.create_activity_summary
|
|
settings_record.create_activity_note
|
|
settings_record.create_activity_date_deadline_range
|
|
settings_record.create_activity_date_deadline_range_type
|
|
settings_record.create_activity_user_id
|
|
"""
|
|
if settings_record and settings_record.create_activity_type_id:
|
|
for record in self:
|
|
activity_vals = {
|
|
'activity_type_id': settings_record.create_activity_type_id.id,
|
|
'summary': settings_record.create_activity_summary or '',
|
|
'note': settings_record.create_activity_note or '',
|
|
}
|
|
if settings_record.create_activity_date_deadline_range > 0:
|
|
activity_vals['date_deadline'] = fields.Date.context_today(settings_record) + relativedelta(
|
|
**{
|
|
settings_record.create_activity_date_deadline_range_type: settings_record.create_activity_date_deadline_range})
|
|
if settings_record._fields.get(
|
|
'create_has_owner_activity') and settings_record.create_has_owner_activity and record.owner_id:
|
|
user = record.owner_id
|
|
elif settings_record._fields.get('create_activity_user_id') and settings_record.create_activity_user_id:
|
|
user = settings_record.create_activity_user_id
|
|
elif settings_record._fields.get('user_id') and settings_record.user_id:
|
|
user = settings_record.user_id
|
|
else:
|
|
user = self.env.user
|
|
if user:
|
|
activity_vals['user_id'] = user.id
|
|
record.activity_schedule(**activity_vals)
|
|
|
|
def copy_data(self, default=None):
|
|
default = dict(default or {})
|
|
vals_list = super().copy_data(default=default)
|
|
if 'name' not in default:
|
|
for document, vals in zip(self, vals_list):
|
|
vals['name'] = document.name if document.type == 'folder' else _("%s (copy)", document.name)
|
|
for document, vals in zip(self, vals_list):
|
|
# Avoid to propagate folder access as we want to copy the document accesses alone
|
|
vals['access_ids'] = default.get('access_ids', False)
|
|
if 'owner_id' not in vals:
|
|
vals['owner_id'] = self.env.user.id
|
|
return vals_list
|
|
|
|
def copy(self, default=None):
|
|
if not self:
|
|
return self
|
|
if not all(self.mapped('active')):
|
|
raise UserError(_('You cannot duplicate document(s) in the Trash.'))
|
|
|
|
# As we avoid to propagate the folder permission by setting access_ids to False (see copy_data), user has no
|
|
# right to create the document. So after checking permission, we execute the copy in sudo.
|
|
self.check_access("create")
|
|
self.check_access('read')
|
|
if not self.env.su and self.folder_id and self.folder_id.user_permission != 'edit':
|
|
# do not check access to allow copying in root company folders
|
|
raise AccessError(_('You cannot copy in that folder'))
|
|
|
|
documents_order = {doc.id: idx for idx, doc in enumerate(self)}
|
|
new_documents = [self.browse()] * len(self)
|
|
|
|
skip_documents = self.env.context.get('documents_copy_folders_only')
|
|
|
|
shortcuts = self.filtered('shortcut_document_id')
|
|
if not skip_documents:
|
|
for destination, targets in shortcuts.grouped('folder_id').items():
|
|
new_shortcuts = targets.action_create_shortcut(destination.id)
|
|
for new_shortcut, target in zip(new_shortcuts, targets):
|
|
new_documents[documents_order[target.id]] = new_shortcut
|
|
|
|
folders = (self - shortcuts).filtered(lambda d: d.type == 'folder')
|
|
if folders:
|
|
embedded_actions = self._get_folder_embedded_actions(folders.ids)
|
|
new_folders = folders.sudo()._copy_with_access(default=default).sudo(False)
|
|
for old_folder, new_folder in zip(folders, new_folders):
|
|
if folder_embedded_actions := embedded_actions.get(old_folder.id):
|
|
embedded_actions_copies = folder_embedded_actions.copy()
|
|
embedded_actions_copies.parent_res_id = new_folder.id
|
|
# no need to check for permission as user is owner of copies (see copy_data).
|
|
old_folder.children_ids.copy({'folder_id': new_folder.id})
|
|
new_documents[documents_order[old_folder.id]] = new_folder
|
|
|
|
if not skip_documents and (documents_sudo := (self - shortcuts - folders).sudo()):
|
|
new_binaries_sudo = documents_sudo._copy_with_access(default=default)
|
|
for old_document_sudo, new_binary_sudo in zip(documents_sudo, new_binaries_sudo):
|
|
new_documents[documents_order[old_document_sudo.id]] = new_binary_sudo.sudo(False)
|
|
if to_copy_attachment_sudo := documents_sudo._copy_attachment_filter(default):
|
|
new_attachments_iterator = iter(to_copy_attachment_sudo.attachment_id.with_context(no_document=True).copy())
|
|
for old_document_sudo, new_binary_sudo in zip(documents_sudo, new_binaries_sudo):
|
|
if old_document_sudo._copy_attachment_filter(default):
|
|
new_attachment = next(new_attachments_iterator)
|
|
new_binary_sudo.write({
|
|
'attachment_id': new_attachment.id,
|
|
# Avoid recompute based on attachment_id
|
|
'name': new_binary_sudo.name,
|
|
'url_preview_image': False,
|
|
})
|
|
|
|
return self.browse([new_document.id for new_document in new_documents])
|
|
|
|
def _copy_attachment_filter(self, default):
|
|
if default and 'attachment_id' in default:
|
|
return self.env['documents.document']
|
|
return self.filtered('attachment_id')
|
|
|
|
def _copy_with_access(self, default):
|
|
"""Copy documents with their access. !Assumes that access rights were checked before! """
|
|
if not self:
|
|
return self
|
|
res = super().copy(default=default)
|
|
if default and 'access_ids' in default:
|
|
return res
|
|
access_vals_list = []
|
|
for doc, doc_copied in zip(self, res):
|
|
access_vals_list += doc.access_ids.filtered('role').copy_data(default={'document_id': doc_copied.id})
|
|
self.env['documents.access'].sudo().create(access_vals_list)
|
|
return res
|
|
|
|
def toggle_favorited(self):
|
|
self.ensure_one()
|
|
self.sudo().write({'favorited_ids': [(3 if self.env.user in self[0].favorited_ids else 4, self.env.user.id)]})
|
|
|
|
def access_content(self):
|
|
self.ensure_one()
|
|
action = {
|
|
'type': "ir.actions.act_url",
|
|
'target': "new",
|
|
}
|
|
if self.url:
|
|
action['url'] = self.url
|
|
elif self.type == 'binary':
|
|
action['url'] = f'/documents/content/{self.access_token}'
|
|
return action
|
|
|
|
def open_resource(self):
|
|
self.ensure_one()
|
|
if self.res_model and self.res_id:
|
|
view_id = self.env[self.res_model].get_formview_id(self.res_id)
|
|
return {
|
|
'res_id': self.res_id,
|
|
'res_model': self.res_model,
|
|
'type': 'ir.actions.act_window',
|
|
'views': [[view_id, 'form']],
|
|
}
|
|
|
|
def toggle_lock(self):
|
|
"""
|
|
sets a lock user, the lock user is the user who locks a file for themselves, preventing data replacement
|
|
and archive (therefore deletion) for any user but himself.
|
|
|
|
Members of the group documents.group_documents_manager and the superuser can unlock the file regardless.
|
|
"""
|
|
self.ensure_one()
|
|
if self.lock_uid:
|
|
if self.env.user == self.lock_uid or self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager'):
|
|
self.lock_uid = False
|
|
else:
|
|
self.lock_uid = self.env.uid
|
|
|
|
@api.depends_context('uid')
|
|
@api.depends('lock_uid')
|
|
def _compute_is_locked(self):
|
|
for record in self:
|
|
record.is_locked = record.lock_uid and not (
|
|
self.env.user == record.lock_uid or
|
|
self.env.is_admin() or
|
|
self.env.user.has_group('documents.group_documents_manager'))
|
|
|
|
def action_archive(self):
|
|
if not self:
|
|
return
|
|
|
|
to_archive_sudo = self.sudo().with_context(active_test=False).search([('id', 'child_of', self.ids)])
|
|
active_documents = to_archive_sudo.filtered(self._active_name).sudo(False)
|
|
if not active_documents:
|
|
return
|
|
|
|
# As document archiving leads to deletion
|
|
message = _("You do not have sufficient access rights to delete these documents.")
|
|
try:
|
|
active_documents.check_access('unlink')
|
|
except UserError as e:
|
|
raise AccessError(message) from e
|
|
# As edit on parent is required to restore (and as removing a file is also somehow modifying the folder)
|
|
if folder_ids := self.folder_id:
|
|
if any(permission != 'edit' for permission in folder_ids.mapped('user_permission')):
|
|
raise UserError(message)
|
|
|
|
active_documents._raise_if_used_folder()
|
|
deletion_date = fields.Date.to_string(fields.Date.today() + relativedelta(days=self.get_deletion_delay()))
|
|
log_message = _("This file has been sent to the trash and will be deleted forever on the %s", deletion_date)
|
|
active_documents._message_log_batch(bodies={doc.id: log_message for doc in active_documents})
|
|
return super(Document, active_documents).action_archive()
|
|
|
|
def action_unarchive(self):
|
|
self_archived = self.filtered(lambda d: not d.active)
|
|
if not self_archived:
|
|
return
|
|
archived_top_parent_documents = self.env["documents.document"].sudo().search(
|
|
expression.AND([
|
|
[('id', 'parent_of', self_archived.ids)],
|
|
[('id', 'not in', self_archived.ids)],
|
|
[('active', '=', False)],
|
|
expression.OR([
|
|
[('folder_id', '=', False)],
|
|
[('folder_id.active', '=', True)],
|
|
])
|
|
])
|
|
).sudo(False)
|
|
if archived_top_parent_documents:
|
|
raise UserError(_(
|
|
"Item(s) you wish to restore are included in archived folders. "
|
|
"To restore these items, you must restore the following including folders instead:\n"
|
|
"- %(folders_list)s",
|
|
folders_list="\n-".join(archived_top_parent_documents.mapped('name')))) # "Restricted" if not allowed
|
|
|
|
# Leave archived children (and descendants) the current user doesn't have access to.
|
|
to_unarchive_candidate_documents = self.env['documents.document'].with_context(active_test=False).search(
|
|
[('id', 'child_of', self_archived.ids)])
|
|
|
|
seen_documents, to_unarchive_ids = set(), set()
|
|
|
|
def add_if_can_be_restored(doc):
|
|
if doc in seen_documents or seen_documents.add(doc):
|
|
return doc.id in to_unarchive_ids
|
|
if not doc.folder_id or doc.folder_id.sudo().active or add_if_can_be_restored(doc.folder_id):
|
|
to_unarchive_ids.add(doc.id)
|
|
return True
|
|
return False
|
|
|
|
for document in to_unarchive_candidate_documents:
|
|
add_if_can_be_restored(document)
|
|
to_unarchive_documents = to_unarchive_candidate_documents.filtered(lambda d: d.id in to_unarchive_ids)
|
|
log_message = _("This document has been restored.")
|
|
to_unarchive_documents._message_log_batch(bodies={doc.id: log_message for doc in to_unarchive_documents})
|
|
return super(Document, to_unarchive_documents).action_unarchive()
|
|
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
"""Access rights fields (access_ids), access_internal, and access_via_link are inherited from containing folder
|
|
unless specified in vals or context defaults.
|
|
"""
|
|
attachments = []
|
|
for vals in vals_list:
|
|
keys = [key for key in vals if
|
|
self._fields[key].related and self._fields[key].related.split('.')[0] == 'attachment_id']
|
|
attachment_dict = {key: vals.pop(key) for key in keys if key in vals}
|
|
attachment = self.env['ir.attachment'].browse(vals.get('attachment_id'))
|
|
|
|
if attachment and attachment_dict:
|
|
attachment.write(attachment_dict)
|
|
elif attachment_dict:
|
|
attachment_dict.setdefault('name', vals.get('name', 'unnamed'))
|
|
# default_res_model and default_res_id will cause unique constraints to trigger.
|
|
attachment = self.env['ir.attachment'].with_context(clean_context(self.env.context)).create(attachment_dict)
|
|
vals['attachment_id'] = attachment.id
|
|
vals['name'] = vals.get('name', attachment.name)
|
|
attachments.append(attachment)
|
|
|
|
# don't allow using default_access_ids
|
|
documents = super(Document, self.with_context(default_access_ids=None)).create(vals_list)
|
|
|
|
is_manager = self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager')
|
|
if not is_manager:
|
|
if any(d.alias_name for d in documents):
|
|
raise AccessError(_('Only Documents Managers can set aliases.'))
|
|
if any(d.is_pinned_folder for d in documents):
|
|
raise AccessError(_('Only Documents Managers can create in company folder.'))
|
|
|
|
for document, attachment in zip(documents, attachments):
|
|
if attachment and not attachment.res_id and (
|
|
not attachment.res_model or attachment.res_model == 'documents.document'):
|
|
attachment.with_context(no_document=True).write({
|
|
'res_model': 'documents.document',
|
|
'res_id': document.id})
|
|
return documents
|
|
|
|
def _prepare_create_values(self, vals_list):
|
|
old_vals_list = [vals.copy() for vals in vals_list]
|
|
vals_list = super()._prepare_create_values(vals_list)
|
|
folders = self.env['documents.document'].browse(v['folder_id'] for v in vals_list if v.get('folder_id'))
|
|
users = self.env['res.users'].browse(v['owner_id'] for v in vals_list if v.get('owner_id'))
|
|
folders.fetch(('access_internal', 'access_via_link', 'access_ids', 'active', 'owner_id'))
|
|
(users | folders.owner_id).fetch(['partner_id'])
|
|
odoobot = self.env.ref('base.user_root')
|
|
vals_list_to_update_linked_record = []
|
|
for vals, old_vals in zip(vals_list, old_vals_list):
|
|
owner = self.env['res.users'].browse(vals.get('owner_id', self.env.user.id))
|
|
owner_values = {'partner_id': owner.partner_id.id, 'role': False, 'last_access_date': fields.Datetime.now()}
|
|
vals_values = {
|
|
'owner_id': owner.id,
|
|
'access_ids': [Command.create(owner_values)] if owner != odoobot else []
|
|
}
|
|
if vals.get('folder_id'):
|
|
folder = self.env['documents.document'].browse(vals['folder_id'])
|
|
if not folder.active:
|
|
raise UserError('It is not possible to create documents in an archived folder.')
|
|
vals_values.update({
|
|
'access_via_link': folder.access_via_link,
|
|
'access_internal': folder.access_internal,
|
|
'access_ids':
|
|
vals_values['access_ids']
|
|
+ [
|
|
Command.create({'partner_id': access.partner_id.id, 'role': access.role})
|
|
for access in folder.access_ids
|
|
if access.role and access.partner_id != owner.partner_id
|
|
] + ([Command.create({'partner_id': folder.owner_id.partner_id.id, 'role': 'edit'})]
|
|
if folder.owner_id != odoobot and folder.owner_id != owner else []
|
|
),
|
|
})
|
|
vals.update((k, v) for k, v in vals_values.items() if k not in old_vals)
|
|
|
|
# If res_model and res_id are not set, we must get it from the related attachment if set (prepare list)
|
|
if 'res_model' not in vals and 'res_id' not in vals and isinstance(vals.get('attachment_id'), int):
|
|
vals_list_to_update_linked_record.append(vals)
|
|
|
|
# For the next step, we need to ensure the related ref is present by getting it from attachment if needed
|
|
if vals_list_to_update_linked_record:
|
|
attachment_by_id = self.env['ir.attachment'].browse(
|
|
[vals['attachment_id'] for vals in vals_list_to_update_linked_record]).grouped('id')
|
|
for vals in vals_list_to_update_linked_record:
|
|
attachment = attachment_by_id[vals['attachment_id']]
|
|
vals['res_model'] = attachment.res_model
|
|
vals['res_id'] = attachment.res_id
|
|
# Delegate vals_list update to _prepare_create_values_for_model to add values depending on related record
|
|
updated_vals_list = []
|
|
for res_model, model_vals_tuple_list in groupby(zip(vals_list, old_vals_list), lambda v: v[0].get('res_model')):
|
|
updated_vals_list += self._prepare_create_values_for_model(
|
|
res_model,
|
|
[vals_tuple[0] for vals_tuple in model_vals_tuple_list],
|
|
[vals_tuple[1] for vals_tuple in model_vals_tuple_list],
|
|
)
|
|
return updated_vals_list
|
|
|
|
def _prepare_create_values_for_model(self, res_model, vals_list, pre_vals_list):
|
|
"""Override to add values depending on related model/record"""
|
|
if (
|
|
res_model
|
|
and issubclass(self.pool[res_model], self.pool['documents.mixin'])
|
|
and not self._context.get('no_document')
|
|
):
|
|
return self.env[res_model]._prepare_document_create_values_for_linked_records(
|
|
res_model, vals_list, pre_vals_list)
|
|
return vals_list
|
|
|
|
def write(self, vals):
|
|
if 'shortcut_document_id' in vals:
|
|
raise UserError(_("Shortcuts cannot change target document."))
|
|
|
|
is_manager = self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager')
|
|
pinned_folders_start = self.filtered('is_pinned_folder')
|
|
|
|
shortcuts_to_check_owner_target_access = self.browse()
|
|
|
|
if (owner_id := vals.get('owner_id')) is not None:
|
|
if not isinstance(owner_id, int): # recordset
|
|
owner_id = owner_id.id
|
|
if not is_manager and any(d.owner_id != self.env.user for d in self):
|
|
raise AccessError(_("You cannot change the owner of documents you do not own."))
|
|
|
|
targets_changing_owner = self.filtered(lambda d: d.owner_id.id != owner_id)
|
|
shortcuts_to_check_owner_target_access |= targets_changing_owner.shortcut_ids.filtered(
|
|
lambda d: d.owner_id == d.shortcut_document_id.owner_id)
|
|
|
|
if folder_id := vals.get('folder_id'):
|
|
folder = self.env['documents.document'].browse(folder_id)
|
|
if not self.env.su and folder.user_permission != 'edit':
|
|
raise AccessError(_("You can't access that folder_id."))
|
|
if folder.type != 'folder' or folder.shortcut_document_id:
|
|
raise UserError(_("Invalid folder id"))
|
|
if any(not d.active or not folder.active or d.folder_id and not d.folder_id.active for d in self):
|
|
raise UserError(_("It is not possible to move archived documents or documents to archived folders."))
|
|
|
|
if vals.get('active') is False:
|
|
if self.env.user.share:
|
|
raise UserError(_("You are not allowed to (un)archive documents."))
|
|
self.check_access('unlink') # As archived gc leads to unlink after `deletion_delay` days.
|
|
|
|
attachment_id = vals.get('attachment_id')
|
|
if attachment_id:
|
|
self.ensure_one()
|
|
|
|
attachments_was_present = []
|
|
for record in self:
|
|
attachments_was_present.append(bool(record.attachment_id))
|
|
if record.type == 'binary' and not record.datas and ('datas' in vals or 'url' in vals):
|
|
body = _("Document Request: %(name)s Uploaded by: %(user)s", name=record.name, user=self.env.user.name)
|
|
record.with_context(no_document=True).message_post(body=body)
|
|
|
|
if record.attachment_id:
|
|
# versioning
|
|
if attachment_id and attachment_id != record.attachment_id.id:
|
|
# Link the new attachment to the related record and link the previous one
|
|
# to the document.
|
|
self.env["ir.attachment"].browse(attachment_id).with_context(
|
|
no_document=True
|
|
).write(
|
|
{
|
|
"res_model": record.res_model or "documents.document",
|
|
"res_id": record.res_id if record.res_model else record.id,
|
|
}
|
|
)
|
|
related_record = self.env[record.res_model].browse(record.res_id)
|
|
if (
|
|
not hasattr(related_record, "message_main_attachment_id")
|
|
or related_record.message_main_attachment_id
|
|
!= record.attachment_id
|
|
):
|
|
record.attachment_id.with_context(no_document=True).write(
|
|
{"res_model": "documents.document", "res_id": record.id}
|
|
)
|
|
if attachment_id in record.previous_attachment_ids.ids:
|
|
record.previous_attachment_ids = [(3, attachment_id, False)]
|
|
record.previous_attachment_ids = [(4, record.attachment_id.id, False)]
|
|
if 'datas' in vals:
|
|
old_attachment = record.attachment_id.with_context(no_document=True).copy()
|
|
# removes the link between the old attachment and the record.
|
|
old_attachment.write({
|
|
'res_model': 'documents.document',
|
|
'res_id': record.id,
|
|
})
|
|
record.previous_attachment_ids = [(4, old_attachment.id, False)]
|
|
elif vals.get('datas') and not vals.get('attachment_id'):
|
|
res_model = vals.get('res_model', record.res_model or 'documents.document')
|
|
res_id = vals.get('res_id') if vals.get('res_model') else record.res_id if record.res_model else record.id
|
|
if res_model and res_model != 'documents.document' and not self.env[res_model].browse(res_id).exists():
|
|
record.res_model = res_model = 'documents.document'
|
|
record.res_id = res_id = record.id
|
|
attachment = self.env['ir.attachment'].with_context(no_document=True).create({
|
|
'name': vals.get('name', record.name),
|
|
'res_model': res_model,
|
|
'res_id': res_id
|
|
})
|
|
record.attachment_id = attachment.id
|
|
|
|
# pops the datas and/or the mimetype key(s) to explicitly write them in batch on the ir.attachment
|
|
# so the mimetype is properly set. The reason was because the related keys are not written in batch
|
|
# and because mimetype is readonly on `ir.attachment` (which prevents writing through the related).
|
|
attachment_dict = {key: vals.pop(key) for key in ['datas', 'mimetype'] if key in vals}
|
|
|
|
if not is_manager and set(vals) & set(self.env['mail.alias.mixin']._fields):
|
|
raise AccessError(_('Only Documents managers can set an alias.'))
|
|
|
|
write_result = super().write(vals)
|
|
if attachment_dict:
|
|
self.mapped('attachment_id').write(attachment_dict)
|
|
|
|
if 'attachment_id' in vals:
|
|
self.attachment_id.check('read')
|
|
|
|
if (new_active := vals.get('active')) is not None:
|
|
if not new_active and self.sudo().search([('id', 'child_of', self.ids), ('active', '=', True)]):
|
|
raise UserError(_('Operation not supported. Please use "Move to Trash" / `action_archive` instead.'))
|
|
if new_active and self.sudo().search([('id', 'parent_of', self.ids), ('active', '=', False)]):
|
|
raise UserError(_('Operation not supported. Please use "Restore" / `action_unarchive` instead.'))
|
|
|
|
if not is_manager and self.filtered('is_pinned_folder') != pinned_folders_start:
|
|
raise AccessError(_("Only Documents Managers can create in company folder."))
|
|
|
|
for document, attachment_was_present in zip(self, attachments_was_present):
|
|
if document.request_activity_id and document.attachment_id and not attachment_was_present:
|
|
feedback = _("Document Request: %(name)s Uploaded by: %(user)s",
|
|
name=self.name, user=self.env.user.name)
|
|
document.with_context(no_document=True).request_activity_id.action_feedback(
|
|
feedback=feedback, attachment_ids=[document.attachment_id.id])
|
|
|
|
if (company_id := vals.get('company_id')) and self.shortcut_ids: # no need if resetting company_id to False
|
|
self.shortcut_ids.sudo().write({'company_id': company_id})
|
|
|
|
if shortcuts_to_check_owner_target_access:
|
|
shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible()
|
|
|
|
return write_result
|
|
|
|
@api.model
|
|
def _pdf_split(self, new_files=None, open_files=None, vals=None):
|
|
vals = vals or {}
|
|
new_attachments = self.env['ir.attachment']._pdf_split(new_files=new_files, open_files=open_files)
|
|
new_documents = self.create([
|
|
dict(vals, attachment_id=attachment.id) for attachment in new_attachments
|
|
])
|
|
# Prevent concurrent update error on accessing these documents for the first time on exiting the split tool
|
|
env_partner = self.env.user.partner_id
|
|
documents_not_member = new_documents.filtered(lambda d: env_partner not in d.access_ids.partner_id)
|
|
self.env['documents.access'].sudo().create([
|
|
{'document_id': doc.id, 'partner_id': env_partner.id, 'last_access_date': fields.Datetime.now()}
|
|
for doc in documents_not_member
|
|
])
|
|
return new_documents
|
|
|
|
@api.model
|
|
def search_panel_select_range(self, field_name, **kwargs):
|
|
if field_name == 'folder_id':
|
|
enable_counters = kwargs.get('enable_counters', False)
|
|
search_panel_fields = ['access_token', 'company_id', 'description', 'display_name', 'folder_id',
|
|
'is_favorited', 'is_pinned_folder', 'owner_id', 'shortcut_document_id',
|
|
'user_permission']
|
|
domain = [('type', '=', 'folder')]
|
|
|
|
if unique_folder_id := self.env.context.get('documents_unique_folder_id'):
|
|
values = self.env['documents.document'].search_read(
|
|
expression.AND([domain, [('folder_id', 'child_of', unique_folder_id)]]),
|
|
search_panel_fields,
|
|
load=False,
|
|
)
|
|
accessible_folder_ids = {rec['id'] for rec in values}
|
|
for record in values:
|
|
if record['folder_id'] not in accessible_folder_ids:
|
|
record['folder_id'] = False # consider them as roots
|
|
return {
|
|
'parent_field': 'folder_id',
|
|
'values': values,
|
|
}
|
|
|
|
records = self.env['documents.document'].search_read(domain, search_panel_fields)
|
|
accessible_folder_ids = {rec['id'] for rec in records}
|
|
|
|
domain_image = {}
|
|
if enable_counters:
|
|
model_domain = expression.AND([
|
|
kwargs.get('search_domain', []),
|
|
kwargs.get('category_domain', []),
|
|
kwargs.get('filter_domain', []),
|
|
[(field_name, '!=', False)]
|
|
])
|
|
domain_image = self._search_panel_domain_image(field_name, model_domain, enable_counters)
|
|
|
|
values_range = OrderedDict()
|
|
shared_root_id = "SHARED" if not self.env.user.share else False
|
|
for record in records:
|
|
record_id = record['id']
|
|
if enable_counters:
|
|
image_element = domain_image.get(record_id)
|
|
record['__count'] = image_element['__count'] if image_element else 0
|
|
folder_id = record['folder_id']
|
|
if folder_id:
|
|
folder_id = folder_id[0]
|
|
if folder_id not in accessible_folder_ids:
|
|
if record['shortcut_document_id']:
|
|
continue
|
|
folder_id = shared_root_id
|
|
elif record['owner_id'][0] == self.env.user.id:
|
|
folder_id = "MY"
|
|
elif record['owner_id'][0] != self.env.ref('base.user_root').id or self.env.user.share:
|
|
if record['shortcut_document_id']:
|
|
continue
|
|
folder_id = shared_root_id
|
|
else:
|
|
folder_id = "COMPANY"
|
|
|
|
record['folder_id'] = folder_id
|
|
values_range[record_id] = record
|
|
|
|
if enable_counters:
|
|
self._search_panel_global_counters(values_range, 'folder_id')
|
|
|
|
special_roots = []
|
|
if not self.env.user.share:
|
|
special_roots = [
|
|
{'bold': True, 'childrenIds': [], 'parentId': False, 'user_permission': 'edit'} | values
|
|
for values in [
|
|
{
|
|
'display_name': _("Company"),
|
|
'id': 'COMPANY',
|
|
'description': _("Common roots for all company users."),
|
|
'user_permission': 'view',
|
|
}, {
|
|
'display_name': _("My Drive"),
|
|
'id': 'MY',
|
|
'user_permission': 'edit',
|
|
'description': _("Your individual space."),
|
|
}, {
|
|
'display_name': _("Shared with me"),
|
|
'id': 'SHARED',
|
|
'description': _("Additional documents you have access to."),
|
|
}, {
|
|
'display_name': _("Recent"),
|
|
'id': 'RECENT',
|
|
'description': _("Recently accessed documents."),
|
|
}, {
|
|
'display_name': _("Trash"),
|
|
'id': 'TRASH',
|
|
'description': _("Items in trash will be deleted forever after %s days.",
|
|
self.get_deletion_delay()),
|
|
}]
|
|
]
|
|
|
|
return {
|
|
'parent_field': 'folder_id',
|
|
'values': list(values_range.values()) + special_roots,
|
|
}
|
|
|
|
return super().search_panel_select_range(field_name)
|
|
|
|
@api.model
|
|
def get_document_max_upload_limit(self):
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
for key in ('document.max_fileupload_size', 'web.max_file_upload_size'):
|
|
value = ICP.get_param(key, default=None)
|
|
if value is None:
|
|
continue
|
|
try:
|
|
return int(value) or None
|
|
except ValueError:
|
|
_logger.error("invalid %s: %r", key, value)
|
|
return odoo.http.DEFAULT_MAX_CONTENT_LENGTH
|
|
|
|
@api.model
|
|
def can_upload_traceback(self):
|
|
return self.env.user._is_internal and \
|
|
bool(self.env.ref('documents.document_support_folder', raise_if_not_found=False))
|
|
|
|
def unlink(self):
|
|
"""Clean unused linked records too.
|
|
|
|
This applies to:
|
|
* Children documents when deleting the parent folder
|
|
* Parent folder if it is archived and has no other children and user is allowed to
|
|
* Attachment if document-related record is deleted
|
|
"""
|
|
to_delete = self.sudo().with_context(active_test=False).search([('id', 'child_of', self.ids)]).sudo(False)
|
|
removable_parent_folders = self.with_context(active_test=False).folder_id.filtered(
|
|
lambda folder: len(folder.children_ids - self) == 0 and not folder.active)
|
|
removable_attachments = self.filtered(lambda d: d.res_model != d._name).attachment_id
|
|
|
|
res = super(Document, to_delete).unlink()
|
|
|
|
if removable_attachments:
|
|
removable_attachments.unlink()
|
|
if removable_parent_folders:
|
|
with contextlib.suppress(AccessError):
|
|
removable_parent_folders.unlink()
|
|
return res
|
|
|
|
@api.ondelete(at_uninstall=False)
|
|
def _unlink_except_unauthorized(self):
|
|
try:
|
|
self.check_access('unlink')
|
|
except UserError as e: # Hide potentially unknown inaccessible content's name.
|
|
raise UserError(_("You are not allowed to delete all these items.")) from e
|
|
|
|
@api.ondelete(at_uninstall=False)
|
|
def _unlink_except_company_folders(self):
|
|
self._raise_if_used_folder()
|
|
|
|
def _raise_if_used_folder(self):
|
|
if folder_ids := self.filtered(lambda d: d.type == 'folder').ids:
|
|
company_field_names = [
|
|
company_field_name
|
|
for company_field_name, field in self.env['res.company']._fields.items()
|
|
if field.comodel_name == "documents.document"
|
|
]
|
|
if self.env['res.company'].search_count(expression.OR([
|
|
[(field_name, 'in', folder_ids)] for field_name in company_field_names
|
|
]), limit=1):
|
|
raise ValidationError(_("Impossible to delete folders used by other applications."))
|
|
|
|
def _unlink_shortcut_if_target_inaccessible(self):
|
|
"""As a fix in stable version, delete shortcuts when target is no longer accessible to the owner."""
|
|
for owner, shortcuts in self.filtered('shortcut_document_id').grouped("owner_id").items():
|
|
shortcuts_as_owner_sudo = shortcuts.with_user(owner).sudo()
|
|
shortcuts_as_owner_sudo.filtered(lambda d: d.shortcut_document_id.user_permission == 'none').unlink()
|
|
|
|
@api.autovacuum
|
|
def _gc_clear_bin(self):
|
|
"""Files are deleted automatically from the trash bin after the configured remaining days."""
|
|
deletion_delay = self.get_deletion_delay()
|
|
self.search([
|
|
('active', '=', False),
|
|
('write_date', '<=', fields.Datetime.now() - relativedelta(days=deletion_delay)),
|
|
], limit=1000).unlink()
|
|
|
|
def _get_access_action(self, access_uid=None, force_website=False):
|
|
self.ensure_one()
|
|
if access_uid and not force_website and self.active and self.env.user.has_group("documents.group_documents_user"):
|
|
url_params = url_encode({
|
|
'preview_id': self.id,
|
|
'view_id': self.env.ref("documents.document_view_kanban").id,
|
|
'menu_id': self.env.ref("documents.menu_root").id,
|
|
'folder_id': self.folder_id.id,
|
|
})
|
|
|
|
return {
|
|
"type": "ir.actions.act_url",
|
|
"url": f"/odoo/action-documents.document_action?{url_params}"
|
|
}
|
|
return super()._get_access_action(access_uid=access_uid, force_website=force_website)
|
|
|
|
@api.readonly
|
|
def permission_panel_data(self):
|
|
"""Provide access related data for a given document/folder"""
|
|
specification = self._permission_specification()
|
|
self.check_access('read')
|
|
result = self.sudo().with_context(active_test=False).web_search_read([('id', '=', self.id)], specification)
|
|
record = result['records'][0]
|
|
selections = {'access_via_link': self._fields.get('access_via_link').selection}
|
|
if self.env.user.has_group('base.group_user'):
|
|
record['access_ids'] = [a for a in record['access_ids'] if a['role']]
|
|
if record['owner_id']['id'] == self.env.ref('base.user_root').id:
|
|
record['owner_id'] = False # Only a real user should be shown in the panel
|
|
selections.update({
|
|
'access_internal': self._fields.get('access_internal').selection,
|
|
'doc_access_roles': self.env['documents.access']._fields.get('role').selection,
|
|
})
|
|
return {'record': record, 'selections': selections}
|
|
|
|
def _permission_specification(self):
|
|
specification = {
|
|
'access_internal': {},
|
|
'access_via_link': {},
|
|
'access_url': {},
|
|
'active': {},
|
|
'display_name': {},
|
|
'folder_id': {},
|
|
'is_access_via_link_hidden': {},
|
|
'type': {},
|
|
'user_permission': {},
|
|
}
|
|
|
|
if self.env.user.has_group('base.group_user'):
|
|
partner_id_spec = {'fields': {'email': {}, 'name': {}, 'user_id': {}}}
|
|
specification.update({
|
|
'access_ids': {
|
|
'fields': {
|
|
'document_id': {},
|
|
'partner_id': partner_id_spec,
|
|
'role': {},
|
|
'expiration_date': {},
|
|
},
|
|
},
|
|
'owner_id': {
|
|
'fields': {
|
|
'partner_id': partner_id_spec,
|
|
},
|
|
}
|
|
})
|
|
|
|
return specification
|