# -*- coding: utf-8 -*- import base64 import contextlib import io import logging import re import uuid from ast import literal_eval from collections import Counter, OrderedDict, defaultdict import requests from dateutil.relativedelta import relativedelta from markupsafe import Markup from werkzeug.urls import url_encode import odoo from odoo import _, api, Command, fields, models from odoo.exceptions import AccessError, UserError, ValidationError from odoo.osv import expression from odoo.tools import groupby, image_process, SQL, create_index from odoo.tools.mimetypes import get_extension from odoo.tools.misc import clean_context from odoo.tools.pdf import PdfFileReader from odoo.addons.mail.tools import link_preview _logger = logging.getLogger(__name__) def _sanitize_file_extension(extension): """ Remove leading and trailing spacing + Remove leading "." """ return re.sub(r'^[\s.]+|\s+$', '', extension) class Document(models.Model): _name = 'documents.document' _description = 'Document' _inherit = ['mail.thread.cc', 'mail.activity.mixin', 'mail.alias.mixin'] _order = 'id desc' _parent_name = 'folder_id' _parent_store = True _systray_view = 'activity' # Attachment attachment_id = fields.Many2one('ir.attachment', ondelete='cascade', auto_join=True, copy=False) attachment_name = fields.Char('Attachment Name', related='attachment_id.name', readonly=False) attachment_type = fields.Selection(string='Attachment Type', related='attachment_id.type', readonly=False) is_editable_attachment = fields.Boolean(default=False, help='True if we can edit the link attachment.') is_multipage = fields.Boolean( 'Is considered multipage', compute='_compute_is_multipage', store=True, readonly=False) datas = fields.Binary(related='attachment_id.datas', related_sudo=True, readonly=False, prefetch=False) raw = fields.Binary(related='attachment_id.raw', related_sudo=True, readonly=False, prefetch=False) file_extension = fields.Char('File Extension', copy=True, store=True, readonly=False, compute='_compute_file_extension', inverse='_inverse_file_extension') file_size = fields.Integer(related='attachment_id.file_size', store=True) checksum = fields.Char(related='attachment_id.checksum') mimetype = fields.Char(related='attachment_id.mimetype') res_model = fields.Char('Resource Model', compute="_compute_res_record", recursive=True, inverse="_inverse_res_model", store=True) res_id = fields.Many2oneReference('Resource ID', compute="_compute_res_record", recursive=True, inverse="_inverse_res_model", store=True, model_field="res_model") res_name = fields.Char('Resource Name', compute="_compute_res_name", compute_sudo=True) index_content = fields.Text(related='attachment_id.index_content') description = fields.Text('Attachment Description', related='attachment_id.description', readonly=False) # Versioning previous_attachment_ids = fields.Many2many('ir.attachment', string="History") # Document name = fields.Char('Name', copy=True, store=True, compute='_compute_name_and_preview', readonly=False) active = fields.Boolean(default=True, string="Active") thumbnail = fields.Binary( readonly=False, store=True, attachment=True, compute='_compute_thumbnail', recursive=True) thumbnail_status = fields.Selection([ ('present', 'Present'), # Document has a thumbnail ('error', 'Error'), # Error when generating the thumbnail ('client_generated', 'Client Generated'), # The PDF thumbnail is generated by the user browser ('restricted', 'Inaccessible'), # Shortcut to no-permission source ], compute="_compute_thumbnail", store=True, readonly=False, recursive=True, ) url = fields.Char('Link URL', index=True, size=1024, tracking=True) url_preview_image = fields.Char( 'URL Preview Image', store=True, compute='_compute_name_and_preview', readonly=False) res_model_name = fields.Char(compute='_compute_res_model_name', index=True) type = fields.Selection([('url', 'URL'), ('binary', 'File'), ('folder', 'Folder')], default='binary', string='Type', required=True, readonly=True) shortcut_document_id = fields.Many2one('documents.document', 'Source Document', ondelete='cascade', index='btree_not_null') shortcut_ids = fields.One2many('documents.document', 'shortcut_document_id') favorited_ids = fields.Many2many('res.users', string="Favorite of") is_favorited = fields.Boolean(compute='_compute_is_favorited', inverse='_inverse_is_favorited') tag_ids = fields.Many2many('documents.tag', 'document_tag_rel', string="Tags") partner_id = fields.Many2one('res.partner', string="Contact", tracking=True) owner_id = fields.Many2one('res.users', default=lambda self: self.env.user.id, string="Owner", required=True, tracking=True, index=True) lock_uid = fields.Many2one('res.users', string="Locked by") is_locked = fields.Boolean(compute="_compute_is_locked", string="Locked") request_activity_id = fields.Many2one('mail.activity') requestee_partner_id = fields.Many2one('res.partner') # Access document_token = fields.Char( required=True, copy=False, index='btree', default=lambda __: base64.urlsafe_b64encode(uuid.uuid4().bytes).decode().removesuffix('==')) access_token = fields.Char(compute='_compute_access_token') access_url = fields.Char(string='Access url', compute='_compute_access_url') is_access_via_link_hidden = fields.Boolean( 'Link Access Hidden', index=True, help='If "True", only people given direct access to this document will be able to view it. ' 'If "False", access with the link also given to all who can access the parent folder.' ) access_via_link = fields.Selection( [('view', 'Viewer'), ('edit', 'Editor'), ('none', 'None')], string='Link Access Rights', default='none', required=True, index=True) access_internal = fields.Selection( [('view', 'Viewer'), ('edit', 'Editor'), ('none', 'None')], string='Internal Users Rights', default='none', required=True, index=True) access_ids = fields.One2many('documents.access', 'document_id', string="Allowed Access") user_permission = fields.Selection( [('edit', 'Editor'), ('view', 'Viewer'), ('none', 'None')], string='User permission', compute='_compute_user_permission', search='_search_user_permission', compute_sudo=True) # Folder = parent document parent_path = fields.Char(index=True) # see '_parent_store' implementation in the ORM for details folder_id = fields.Many2one('documents.document', string="Parent Folder", ondelete="set null", tracking=True, domain="[('type', '=', 'folder'), ('shortcut_document_id', '=', False)]", required=False, index=True) children_ids = fields.One2many('documents.document', 'folder_id') deletion_delay = fields.Integer("Deletion delay", compute="_compute_deletion_delay", help="Delay after permanent deletion of the document in the trash (days)") company_id = fields.Many2one('res.company', string='Company', store=True, readonly=False, index=True) # TODO: remove in master is_pinned_folder = fields.Boolean("Pinned to Company roots", compute='_compute_is_pinned_folder', store=True) # Stat buttons document_count = fields.Integer('Document Count', compute='_compute_document_count') # Activity create_activity_option = fields.Boolean(string='Create a new activity') create_activity_type_id = fields.Many2one('mail.activity.type', string="Activity type") create_activity_summary = fields.Char('Summary') create_activity_date_deadline_range = fields.Integer(string='Due Date In') create_activity_date_deadline_range_type = fields.Selection([ ('days', 'Days'), ('weeks', 'Weeks'), ('months', 'Months'), ], string='Due type', default='days') create_activity_note = fields.Html(string="Note") create_activity_user_id = fields.Many2one('res.users', string='Responsible') # Actions that we can do on the document available_embedded_actions_ids = fields.Many2many( 'ir.embedded.actions', 'Available Actions', compute='_compute_available_embedded_actions_ids', groups='base.group_user') # Alias alias_tag_ids = fields.Many2many('documents.tag', 'document_alias_tag_rel', string="Alias Tags") # UI fields last_access_date_group = fields.Selection(selection=[ ('0_older', 'Older'), ('1_month', 'This Month'), ('2_week', 'This Week'), ('3_day', 'Today'), ], string="Last Accessed On", compute='_compute_last_access_date_group', search='_search_last_access_date_group') _sql_constraints = [ ('attachment_unique', 'unique (attachment_id)', "This attachment is already a document"), ('document_token_unique', 'unique (document_token)', "Access tokens already used."), ('folder_id_not_id', 'check(folder_id <> id)', "A folder cannot be included in itself"), ('shortcut_document_id_not_id', 'check(shortcut_document_id <> id)', "A shortcut cannot point to itself"), ] def init(self): super().init() create_index(self.env.cr, indexname='documents_document_res_model_res_id_idx', tablename=self._table, expressions=['res_model', 'res_id']) @api.depends('document_token') def _compute_access_token(self): for document in self: document.access_token = f"{document.document_token}o{document.id or 0:x}" @api.depends('access_token') def _compute_access_url(self): for document in self: document.access_url = f'{document.sudo().get_base_url()}/odoo/documents/{document.access_token}' @api.depends("folder_id", "company_id") @api.depends_context("uid", "allowed_company_ids") def _compute_display_name(self): accessible_records = self._filtered_access('read') not_accessible_records = self - accessible_records not_accessible_records.display_name = _("Restricted") folders = accessible_records.filtered(lambda d: d.type == 'folder') for record in folders: if record.user_permission != 'none': record.display_name = record.name else: record.display_name = _("Restricted Folder") for record in accessible_records - folders: record.display_name = record.name @api.depends('name', 'type') def _compute_file_extension(self): for record in self: if record.type != 'binary': record.file_extension = False elif record.name: record.file_extension = _sanitize_file_extension(get_extension(record.name.strip())) or False def _inverse_file_extension(self): for record in self: record.file_extension = _sanitize_file_extension(record.file_extension) if record.file_extension else False @api.constrains('shortcut_document_id', 'shortcut_ids', 'type', 'folder_id', 'children_ids', 'company_id') def _check_shortcut_fields(self): errors = [] wrong_types, wrong_companies = self.browse(), self.browse() # Access rights can allow for a document to be edited without having access to its parent folder wrong_parents_sudo = self.folder_id.sudo().filtered('shortcut_document_id') for target in self.filtered('shortcut_ids'): for shortcut in target.shortcut_ids: if shortcut.type != target.type: wrong_types |= shortcut for shortcut in self.filtered('shortcut_document_id'): if shortcut.type != shortcut.shortcut_document_id.type: wrong_types |= shortcut if shortcut.children_ids: wrong_parents_sudo |= shortcut if (shortcut.shortcut_document_id.company_id and shortcut.shortcut_document_id.company_id != shortcut.company_id): wrong_companies |= shortcut if wrong_types: message = _("The following documents/shortcuts have a type mismatch: \n") documents_list = "\n- ".join(wrong_types.mapped('name')) errors.append(f'{message}\n- {documents_list}') if wrong_parents_sudo: message = _("The following shortcuts cannot be set as documents parents: \n") shortcuts_list = "\n- ".join(wrong_parents_sudo.mapped('name')) errors.append(f'{message}\n- {shortcuts_list}') if wrong_companies: message = _("The following documents/shortcuts have a company mismatch: \n") shortcuts_list = "\n- ".join(wrong_companies.mapped('name')) errors.append(f'{message}\n- {shortcuts_list}') if errors: raise ValidationError('\n\n'.join(errors)) @api.constrains('owner_id', 'folder_id') def _check_portal_cant_own_root_document(self): wrong_records = self.filtered(lambda d: d.owner_id.sudo().share and not self.folder_id) if wrong_records: raise ValidationError(_( "The following documents/folders can't be owned by a Portal User: \n- %(partners)s", partners="\n-".join(wrong_records.mapped('name')))) @api.constrains('type', 'alias_name') def _check_alias(self): wrong_records = self.filtered(lambda d: (d.type != 'folder' or d.shortcut_document_id) and d.alias_name) if wrong_records: raise ValidationError(_( "The following documents can't have alias: \n- %(records)s", records="\n-".join(wrong_records.mapped('name')))) @api.depends('folder_id', 'owner_id', 'type') def _compute_is_pinned_folder(self): # TODO: remove in master, for stable force the field to reflect owner / folder value # because it's used in access rule `documents_document_write_base_rule` for document in self: document.is_pinned_folder = ( document.type == 'folder' and not document.folder_id and document.owner_id == self.env.ref('base.user_root') ) @api.depends('attachment_id', 'url', 'shortcut_document_id') def _compute_name_and_preview(self): request_session = requests.Session() shortcuts = self.filtered('shortcut_document_id') for record in self - shortcuts: if record.attachment_id: record.name = record.attachment_id.name record.url_preview_image = False elif record.url: preview = link_preview.get_link_preview_from_url(record.url, request_session) if not preview: continue if preview.get('og_title'): record.name = preview['og_title'] if preview.get('og_image'): record.url_preview_image = preview['og_image'] for shortcut in shortcuts: shortcut.name = shortcut.name or shortcut.shortcut_document_id.name shortcut.url_preview_image = shortcut.url_preview_image or shortcut.shortcut_document_id.url_preview_image @api.depends_context('uid', 'allowed_company_ids') @api.depends('access_ids', 'access_internal', 'access_via_link', 'owner_id', 'is_access_via_link_hidden', 'company_id', 'folder_id.access_ids', 'folder_id.access_internal', 'folder_id.access_via_link', 'folder_id.owner_id', 'folder_id.company_id') def _compute_user_permission(self): for document in self: if self.env.user.has_group('documents.group_documents_system'): document.user_permission = ( 'edit' if not (company := document.company_id) or company in self.env.companies or company not in self.env.user.company_ids else 'none') continue document.user_permission = document._get_permission_without_token() if document.user_permission == 'view' and document.access_via_link == 'edit': document.user_permission = 'edit' elif document.user_permission == 'none' and document.folder_id and document.access_via_link != 'none' \ and not document.is_access_via_link_hidden: # If the user can access the parent, they have the link. # This only works one level up, as it mimics accessing through the interface. with contextlib.suppress(AccessError): if document.folder_id._get_permission_without_token() != 'none': document.user_permission = document.access_via_link def _get_permission_without_token(self): self.ensure_one() is_user_company = self.company_id and self.company_id in self.env.user.company_ids is_disabled_company = is_user_company and self.company_id not in self.env.companies if is_disabled_company: return 'none' # own documents if self.owner_id == self.env.user: return 'edit' user_permission = 'none' # access with if access := self.access_ids.filtered( lambda a: a.partner_id == self.env.user.partner_id and (not a.expiration_date or a.expiration_date > fields.Datetime.now()) ): user_permission = access.role or self.access_via_link # access as internal if not self.env.user.share and user_permission != "edit" and self.access_internal != 'none': if not self.company_id or self.company_id in self.env.companies: user_permission = ( 'edit' if self.env.user.has_group('documents.group_documents_manager') else self.access_internal ) return user_permission def _search_user_permission(self, operator, value): if operator not in ('=', '!='): raise NotImplementedError("Unsupported search operator") if self.env.user._is_public() or value not in {'view', 'edit', 'none'}: return expression.FALSE_DOMAIN if (operator, value) in (("=", "edit"), ("!=", "view")): # access but no view => edit searched_roles = ['edit'] elif (operator, value) in (('=', 'view'), ("!=", "edit")): # access without edit => view searched_roles = ['view'] elif (operator, value) == ("!=", "none"): # any access searched_roles = ['edit', 'view'] else: return expression.FALSE_DOMAIN # ("=", "none") = not allowed, so no records other_company = [('company_id', '!=', False), ('company_id', 'not in', self.env.user.company_ids.ids)] allowed_or_no_company = [('company_id', 'in', [False] + self.env.companies.ids)] any_except_disabled_company = expression.OR([ [('company_id', 'in', self.env.companies.ids)], [('company_id', 'not in', self.env.user.company_ids.ids)] ]) if self.env.user.has_group('documents.group_documents_system'): if searched_roles == ['view']: return expression.FALSE_DOMAIN # System Administrator has "edit" on all documents, so finds none with "view" only. return any_except_disabled_company # Access from membership if searched_roles == ['view']: access_level_domain = expression.OR([ [('role', '=', 'view'), ('document_id.access_via_link', 'in', ('none', 'view'))], [('role', '=', False), ('document_id.access_via_link', '=', 'view')], ]) elif searched_roles == ['edit']: access_level_domain = expression.OR([ [('role', '=', 'edit')], [('document_id.access_via_link', '=', 'edit')] ]) else: access_level_domain = expression.OR([ [('role', 'in', ('view', 'edit'))], [('document_id.access_via_link', '!=', 'none')] ]) access_domain = [('access_ids', 'any', expression.AND([ access_level_domain, expression.AND([ [('partner_id', '=', self.env.user.partner_id.id)], ['|', ('expiration_date', '=', False), ('expiration_date', '>', fields.Datetime.now())], ]), ]))] # Access from ownership owner_domain = [('owner_id', '=', self.env.user.id)] direct_domain = expression.AND([ any_except_disabled_company, access_domain if 'edit' not in searched_roles else expression.OR([access_domain, owner_domain]), ]) # Access form access_internal if self.env.user.has_group('documents.group_documents_manager'): if searched_roles == ['view']: direct_domain = expression.AND([ direct_domain, expression.OR([[('access_internal', '=', 'none')], other_company]) ]) else: direct_domain = expression.OR([ direct_domain, expression.AND([[('access_internal', 'in', ('view', 'edit'))], allowed_or_no_company]), ]) elif not self.env.user.share: if searched_roles == ['view']: internal_domain = [('access_internal', '=', 'view'), ('access_via_link', 'in', ('none', 'view'))] elif searched_roles == ['edit']: internal_domain = expression.OR([ [('access_internal', '=', 'edit')], expression.AND([[('access_internal', '=', 'view')], [('access_via_link', '=', 'edit')]]), ]) else: internal_domain = [('access_internal', 'in', ('view', 'edit'))] direct_domain = expression.OR([direct_domain, expression.AND([internal_domain, allowed_or_no_company])]) # Look one level up for links unless hidden link_via_parent_domain = expression.AND([ [('access_via_link', 'in', searched_roles)], [('is_access_via_link_hidden', '=', False)], [('folder_id', 'any', direct_domain)], ]) return expression.OR([direct_domain, link_via_parent_domain]) @api.depends('datas', 'mimetype') def _compute_is_multipage(self): for document in self: # external computation to be extended document.is_multipage = bool(document._get_is_multipage()) # None => False @api.depends('attachment_id', 'attachment_id.res_model', 'attachment_id.res_id', 'shortcut_document_id.res_model', 'shortcut_document_id.res_id') def _compute_res_record(self): for record in self: attachment = record.attachment_id if attachment: record.res_model = attachment.res_model record.res_id = attachment.res_id if record.shortcut_document_id: record.res_model = record.shortcut_document_id.res_model record.res_id = record.shortcut_document_id.res_id @api.depends('attachment_id', 'res_model', 'res_id') def _compute_res_name(self): for record in self: if record.attachment_id: record.res_name = record.attachment_id.res_name elif record.res_id and record.res_model: record.res_name = self.env[record.res_model].browse(record.res_id).display_name else: record.res_name = False def _inverse_res_model(self): for record in self: attachment = record.attachment_id.with_context(no_document=True) if attachment: # Avoid inconsistency in the data, write both at the same time. # In case a check_access is done between res_id and res_model modification, # an access error can be received. (Mail causes this check_access) attachment.sudo().write({'res_model': record.res_model, 'res_id': record.res_id}) @api.depends('checksum', 'shortcut_document_id.thumbnail', 'shortcut_document_id.thumbnail_status', 'shortcut_document_id.user_permission') def _compute_thumbnail(self): for document in self: if document.shortcut_document_id: if document.shortcut_document_id.user_permission != 'none': document.thumbnail = document.shortcut_document_id.thumbnail document.thumbnail_status = document.shortcut_document_id.thumbnail_status else: document.thumbnail = False document.thumbnail_status = 'restricted' elif document.mimetype and document.mimetype.startswith('application/pdf'): # Thumbnails of pdfs are generated by the client. To force the generation, we invalidate the thumbnail. document.thumbnail = False document.thumbnail_status = 'client_generated' elif document.mimetype and document.mimetype.startswith('image/'): try: document.thumbnail = base64.b64encode(image_process(document.raw, size=(200, 140), crop='center')) document.thumbnail_status = 'present' except (UserError, TypeError): document.thumbnail = False document.thumbnail_status = 'error' else: document.thumbnail = False document.thumbnail_status = False @api.depends('type') def _compute_deletion_delay(self): folders = self.filtered(lambda d: d.type == 'folder') folders.deletion_delay = self.get_deletion_delay() (self - folders).deletion_delay = False @api.depends_context('uid') @api.depends('type', 'children_ids', 'shortcut_document_id') def _compute_document_count(self): folders = (self | self.shortcut_document_id).filtered( lambda d: d.type == 'folder' and not d.shortcut_document_id) children_counts = Counter(dict(self._read_group( [('folder_id', 'in', folders.ids)], groupby=['folder_id'], aggregates=['__count']))) for doc in self: doc.document_count = children_counts[doc.shortcut_document_id or doc] def _get_folder_embedded_actions(self, folder_ids): """Return the enabled actions for the given folder.""" embedded_actions = self.env['ir.embedded.actions'].sudo().search( domain=[ ('parent_action_id', '=', self.env.ref("documents.document_action").id), ('action_id', '!=', False), ('action_id.type', '=', 'ir.actions.server'), ('parent_res_model', '=', 'documents.document'), ('parent_res_id', 'in', folder_ids), ('groups_ids', 'in', [False] + self.env.user.groups_id.ids), ], order='sequence', ) # group after ordering by `ir.embedded.actions` sequence return embedded_actions.grouped('parent_res_id') @api.depends_context('uid') @api.depends('folder_id') def _compute_available_embedded_actions_ids(self): embedded_actions = self._get_folder_embedded_actions(self.folder_id.ids) embedded_actions_per_folder = { folder_id: actions.ids for folder_id, actions in embedded_actions.items() } self.available_embedded_actions_ids = False for document in self.filtered(lambda d: d.type != 'folder' and not d.shortcut_document_id): document.available_embedded_actions_ids = embedded_actions_per_folder.get(document.folder_id.id, False) def _get_last_access_date_group_cte(self): return SQL(""" WITH last_access_date AS ( SELECT (CASE WHEN last_access_date > NOW() - INTERVAL '1 days' THEN '3_day' WHEN last_access_date > NOW() - INTERVAL '7 days' THEN '2_week' WHEN last_access_date > NOW() - INTERVAL '1 months' THEN '1_month' ELSE '0_older' END) AS date, document_id FROM documents_access WHERE partner_id = %s ) """, self.env.user.partner_id.id) @api.depends('access_ids') def _compute_last_access_date_group(self): self.env.cr.execute(SQL( """(%s SELECT document_id, date FROM last_access_date WHERE document_id = ANY(%s))""", self._get_last_access_date_group_cte(), self.ids, )) values = {line['document_id']: line['date'] for line in self.env.cr.dictfetchall()} for document in self: document.last_access_date_group = values.get(document.id) def _search_last_access_date_group(self, operator, operand): if operator != '=': raise NotImplementedError("Unsupported search operator or value") query = SQL( """(%s SELECT document_id FROM last_access_date WHERE date = %s)""", self._get_last_access_date_group_cte(), operand) return [('id', 'in', query)] def _field_to_sql(self, alias, fname, query=None, flush: bool = True) -> SQL: if fname == 'last_access_date_group': # Allow to group on the field return SQL("""(%s SELECT date FROM last_access_date WHERE document_id = %s)""", self._get_last_access_date_group_cte(), SQL.identifier(alias, 'id')) return super()._field_to_sql(alias, fname, query, flush) def _order_field_to_sql(self, alias, field_name, direction, nulls, query): if field_name == 'last_access_date_group': sql_field = SQL( "SELECT last_access_date FROM documents_access WHERE partner_id = %s AND document_id = %s", self.env.user.partner_id.id, SQL.identifier(alias, 'id') ) return SQL("(%s) %s %s", sql_field, direction, nulls) if field_name == 'is_folder': sql_field = SQL("%s != 'folder'", SQL.identifier(alias, 'type')) return SQL("(%s) %s %s", sql_field, direction, nulls) return super()._order_field_to_sql(alias, field_name, direction, nulls, query) @api.model def get_previewable_file_extensions(self): return {'bmp', 'mp4', 'mp3', 'png', 'jpg', 'jpeg', 'pdf', 'gif', 'txt', 'wav'} def action_move_documents(self, folder_id): """Move document to new parent folder or none (my drive or company) :param int|bool folder_id: new parent folder id """ target_folder = self.browse(folder_id) if target_folder.shortcut_document_id: return self.action_move_documents(target_folder.shortcut_document_id.id) documents_to_move = self.filtered(lambda d: d.folder_id != target_folder and d != target_folder) if not documents_to_move: return try: (documents_to_move | target_folder).check_access('write') except UserError: raise AccessError(_("You are not allowed to perform this operation.")) if target_folder and (cyclic_move := documents_to_move.filtered( lambda d: target_folder.parent_path.startswith(d.parent_path))): raise UserError(_( "Impossible to move the following items as this would create a recursive hierarchy:\n" "- %(documents)s", documents='\n- '.join(cyclic_move.mapped('name')) )) documents_to_move.folder_id = target_folder if target_folder and (documents_to_sync := documents_to_move.filtered(lambda d: not d.shortcut_document_id)): documents_to_sync.sudo().action_update_access_rights( access_internal=target_folder.access_internal, access_via_link=target_folder.access_via_link, is_access_via_link_hidden=target_folder.is_access_via_link_hidden, # Simply add partners of destination partners={access.partner_id: (access.role, access.expiration_date) for access in target_folder.access_ids}, ) def action_change_owner(self, new_user_id): if not self.env.user._is_admin() and not self.env.user.has_group('documents.group_documents_manager'): if any(document.owner_id != self.env.user for document in self): raise AccessError(_("You are not allowed to change ownerships of documents you do not own.")) self.owner_id = new_user_id def action_create_shortcut(self, location_folder_id=None): """Create a shortcut to self in a specific folder or as sibling :param int | None location_folder_id: Optional: where to create the shortcut. """ if not self.ids: return if len(self.folder_id.ids) > 1 and location_folder_id is None: raise UserError(_("A destination is required when creating multiple shortcuts at once.")) if self.shortcut_document_id: targets = self.filtered(lambda d: not d.shortcut_document_id) | self.shortcut_document_id return targets.action_create_shortcut(location_folder_id or self.folder_id.id) location = self.browse(location_folder_id) if location_folder_id is not None else self.folder_id if location_folder_id and location.shortcut_document_id: return self.action_create_shortcut(location.shortcut_document_id.id) if location: try: location.check_access('write') except UserError as exc: if self.env.user.share: raise AccessError(_("You are not allowed to write in this folder.")) from exc location = self.browse() # My Drive self.env.user._bus_send('simple_notification', { 'type': 'info', 'message': _("Shortcut created in My Drive"), }) if location.shortcut_document_id: raise AccessError(_("Shortcuts cannot contain documents.")) self.check_access('read') return self.sudo().create([{ "folder_id": location.id, "shortcut_document_id": document.id, "name": document.name, "type": document.type, "access_internal": document.access_internal or 'view', "access_via_link": document.access_via_link or 'none', "company_id": document.company_id.id, "access_ids": [ Command.create({ "partner_id": access.partner_id.id, "role": access.role, }) for access in document.access_ids if access.role ], "is_multipage": document.is_multipage, "is_access_via_link_hidden": document.is_access_via_link_hidden, } for document in self]).sudo(False) def action_update_access_rights(self, access_internal=None, access_via_link=None, is_access_via_link_hidden=None, partners=None, notify=False, message=""): """Update access to a document and propagate if applicable. This method can be called to update the access of internal users, with the link, as well as a set of partners and roles to the records in self and their children (except shortcuts), and shortcuts pointing to them as they are kept synchronized. Modifications to internal users and link access are propagated down to children until the new value is already present. For partners, all changes are applied to all children regardless of the existing rights structure. :param str | None access_internal: optional new permission level for internal users :param str | None access_via_link: optional new permission level for partners with the link :param bool|None is_access_via_link_hidden: optional new value for discoverability :param dict[str | int | res.partner(), tuple[str | bool | None, str | datetime | bool | None] partners: Mapping of partner(_id) to the tuple: role: 'edit', 'view', False (=>delete), expiration: datetime string, False (removed/None) :param bool notify: whether to send an email :param str message: message to add to the email """ if len(self.ids) == 0: return try: self.check_access('write') except UserError: raise AccessError("You are not allowed to update these access rights.") if len(self.ids) > 1 and notify: raise UserError(_("Impossible to invite partners on multiple documents at once.")) if self.shortcut_document_id: raise UserError(_("You can not update the access of a shortcut, update its target instead.")) # Check inputs as we are going to bypass the ORM in the private method(s) access_options = {'view', 'edit', 'none', None} hidden_options = {None, True, False} role_options = {'edit', 'view', False, None} incorrect_fields_to_options = { **({'is_access_via_link_hidden': hidden_options} if is_access_via_link_hidden not in hidden_options else {}), **({'access_via_link': access_options} if access_via_link not in access_options else {}), **({'access_internal': access_options} if access_internal not in access_options else {}), **({'partners.role': role_options} if any(role not in role_options for (__, (role, __)) in (partners or {}).items()) else {}) } if incorrect_fields_to_options: hints = "\n- " + "\n- ".join(f'{name}: {options}' for name, options in incorrect_fields_to_options.items()) raise UserError(_( "Incorrect values. Use one of the following for the following fields: %(hints)s.)", hints=hints )) self._action_update_access(access_internal, access_via_link, is_access_via_link_hidden) if partners: partners = { self.env['res.partner'].browse(int(partner)) if isinstance(partner, str | int) else partner: (role, fields.Datetime.to_datetime(exp) if exp and isinstance(exp, str) else exp) for partner, (role, exp) in (partners or {}).items() } root_access_partners = self.access_ids.partner_id self._action_update_members(partners) if notify: self._send_access_by_mail( {p: role for p, (role, __) in partners.items() if role and p not in root_access_partners}, message=message ) return self.mapped('user_permission') def _action_update_access(self, access_internal, access_via_link, is_access_via_link_hidden): """Update the access on self and children. Stop the propagation when the value is already the right one. :param str | None access_internal: change the `access_internal` if not None :param str | None access_via_link: change the `access_via_link` if not None :param bool | None is_access_via_link_hidden: change the `is_access_via_link_hidden` if not None """ self.flush_model() shortcuts_to_check_owner_target_access = self.browse() for field, value in ( ('access_internal', access_internal), ('access_via_link', access_via_link), ('is_access_via_link_hidden', is_access_via_link_hidden), ): if value is None: continue # records that we might need to update candidates_domain = [ (field, '!=', value), *([] if self.env.su else [('user_permission', '=', 'edit')]), # the update is done only "target -> shortcut", # but not "shortcut -> target" ('shortcut_document_id', '=', False), ('id', 'child_of', self.ids), ] candidates = self.env['documents.document']._search( candidates_domain).select('id', 'folder_id', 'shortcut_document_id', field) shortcuts_to_check_owner_target_access |= self.search([('shortcut_document_id', 'any', candidates_domain)]) self.env.cr.execute(SQL(""" WITH RECURSIVE candidates AS (%(candidates)s), -- explore the folders documents_to_update AS ( SELECT id FROM candidates WHERE id = ANY(%(root_ids)s) UNION SELECT child.id FROM candidates AS child JOIN documents_to_update AS parent ON child.folder_id = parent.id ), documents_and_shortcuts AS ( SELECT id FROM documents_to_update UNION -- document.shortcut_ids -- update in "SUDO" to keep them synchronized SELECT shortcut.id FROM documents_document AS shortcut JOIN documents_to_update ON documents_to_update.id = shortcut.shortcut_document_id ) UPDATE documents_document SET %(field)s = %(value)s FROM documents_and_shortcuts AS doc -- document | document.children_ids | document.shortcut_ids WHERE documents_document.id = doc.id """, field=SQL(field), value=value, root_ids=self.ids, candidates=candidates)) self.invalidate_model([ 'access_internal', 'access_via_link', 'is_access_via_link_hidden', 'user_permission', ]) shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible() def _action_update_members(self, partners): """Update the members access on all files bellow the current folder. :param partners: Partners to add as members / change access """ self.env['documents.access'].flush_model() partners_to_remove = self.env['res.partner'] # {(role, expiration_date): partners} values_to_update = defaultdict(lambda: self.env['res.partner']) for partner, (role, expiration_date) in partners.items(): if role is False: # remove the members partners_to_remove |= partner elif role is not None or expiration_date is not None: values_to_update[role, expiration_date] |= partner # use `_search` to respect access rules and to use `_search_user_permission` to_update_domain = [ *([] if self.env.su else [('user_permission', '=', 'edit')]), ('shortcut_document_id', '=', False), # update "target -> shortcuts" but not "shortcut -> target" ('id', 'child_of', self.ids), ] documents = self.env['documents.document']._search(to_update_domain).select('id') for (role, expiration_date), partners in values_to_update.items(): update_fields = [] if role not in ('edit', 'view'): raise UserError(_("Invalid role.")) # The public method would have returned a more insightful message else: update_fields.append(SQL('role = %(role)s', role=role)) if expiration_date is not None: update_fields.append(SQL( 'expiration_date = %(expiration_date)s', expiration_date=expiration_date or None, )) update_fields = SQL(',').join(update_fields) self.env.cr.execute(SQL( """ WITH documents AS (%(documents)s), documents_and_shortcuts AS ( SELECT * FROM documents UNION -- document.shortcut_ids SELECT shortcut.id FROM documents_document AS shortcut JOIN documents AS document ON document.id = shortcut.shortcut_document_id ) INSERT INTO documents_access ( document_id, partner_id, role, expiration_date ) ( SELECT DISTINCT ON (doc.id, partner_id) doc.id, partner_id, %(role)s, %(expiration_date)s FROM documents_and_shortcuts AS doc JOIN LATERAL UNNEST(%(partner_ids)s) AS partner_id ON 1=1 ) ON CONFLICT (document_id, partner_id) DO UPDATE SET %(update_fields)s """, documents=documents, partner_ids=partners.ids, expiration_date=expiration_date or None, role=role, update_fields=update_fields, )) shortcuts_to_check_owner_target_access = self.browse() if partners_to_remove: shortcuts_to_check_owner_target_access = self.search(expression.AND([ [('shortcut_document_id', 'any', to_update_domain)], [('owner_id.partner_id', 'in', partners_to_remove.ids)], ])) self.env.cr.execute(SQL(""" WITH documents AS (%(documents)s), docs_and_shortcuts AS ( SELECT id FROM documents UNION SELECT shortcut.id FROM documents AS doc JOIN documents_document AS shortcut ON doc.id = shortcut.shortcut_document_id ) DELETE FROM documents_access AS access USING docs_and_shortcuts AS doc WHERE access.document_id = doc.id AND access.partner_id = ANY(%(partner_ids)s) """, documents=documents, partner_ids=partners_to_remove.ids)) self.env['documents.document'].invalidate_model([ 'access_ids', 'user_permission', ]) shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible() def action_see_documents(self): if self.type != "folder": raise UserError(_("Not a folder.")) domain = [('folder_id', '=', self.id)] return { 'name': _('Documents'), 'domain': domain, 'res_model': 'documents.document', 'type': 'ir.actions.act_window', 'views': [(False, 'list'), (False, 'form')], 'view_mode': 'list,form', 'context': {'searchpanel_default_folder_id': self.id} } def toggle_is_pinned_folder(self): # TODO: remove in master self.ensure_one() @api.model def get_documents_actions(self, folder_id): """Return the available actions and a key to know if the action is embedded on the folder.""" folder = self.env['documents.document'].browse(folder_id).exists() if not folder: raise UserError(_('This folder does not exist.')) embedded_actions = self._get_folder_embedded_actions(folder.ids) embedded_actions = embedded_actions[folder.id].action_id.ids if embedded_actions else [] actions = self.env['ir.actions.server'].sudo().search([ ('model_id', '=', self.env['ir.model']._get_id('documents.document')), ('usage', '=', 'ir_actions_server'), ]) # Do not show an action if it's a child of a different action actions -= actions.child_ids return [{ "id": action.id, "name": action.display_name, "is_embedded": action.id in embedded_actions } for action in actions] @api.model def action_folder_embed_action(self, folder_id, action_id, groups_ids=None): """Enable / disable the action for the given folder :param int folder_id: The folder on which we pin the actions :param int action_id: The id of the action to enable :param list[int] groups_ids: ids of the groups the action is available to """ if not self.env.user.has_group('documents.group_documents_user'): raise AccessError(_("You are not allowed to pin/unpin embedded Actions.")) if not groups_ids: groups_ids = self.env.ref('base.group_user').ids action = self.env['ir.actions.server'].browse(action_id).sudo().exists() if not action: raise UserError(_('This action does not exist.')) if action.type != 'ir.actions.server': raise UserError(_('You can not ping that type of action.')) folder = self.env['documents.document'].browse(folder_id).sudo().exists() if not folder or folder.type != 'folder': raise UserError(_('You can not ping an action on that document.')) embedded = self.env['ir.embedded.actions'].sudo().search([ ('parent_action_id', '=', self.env.ref("documents.document_action").id), ('action_id', '=', action_id), ('action_id.type', '=', 'ir.actions.server'), ('parent_res_model', '=', 'documents.document'), ('parent_res_id', '=', folder_id), ('groups_ids', 'in', groups_ids), ]).sudo(False) if embedded: embedded.unlink() else: # first pinned action should be displayed first last_action = self.env['ir.embedded.actions'].search( [], order='sequence DESC', limit=1) self.env['ir.embedded.actions'].create({ 'name': action.name, 'parent_action_id': self.env.ref("documents.document_action").id, 'action_id': action.id, 'parent_res_model': 'documents.document', 'parent_res_id': folder_id, 'groups_ids': groups_ids, 'sequence': last_action.sequence + 1 if last_action else 1, }) return self.get_documents_actions(folder_id) @api.model def action_execute_embedded_action(self, action_id): """Execute an embedded action on context records. :param int action_id: id of embedded action to be run on context provided records. """ if self.env.user.share: raise AccessError(_("You are not allowed to execute embedded actions.")) if self.env.context.get('active_model') != 'documents.document': raise UserError(_("Unavailable action.")) ids = self.env.context.get('active_ids', self.env.context.get('active_id')) if not ids: raise UserError(_("Missing documents reference.")) embedded_action = self.env['ir.embedded.actions'].browse([action_id]) if all(action_id in document.available_embedded_actions_ids.ids for document in self.browse(ids)): return self.env['ir.actions.server'].browse(embedded_action.action_id.id).run() raise UserError(_("Unavailable action.")) def action_link_to_record(self, model=False): """Open the `link_to_record_wizard` to choose a record to link to the current documents. This method can be used inside server actions. """ context = { 'default_document_ids': self.ids, 'default_resource_ref': False, 'default_is_readonly_model': False, 'default_model_ref': False, } if documents_link_record := self.filtered(lambda d: d.res_model != 'documents.document'): return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'type': 'warning', 'message': _( "Already linked Documents: %s", ", ".join(documents_link_record.mapped('name')) ), } } if model: self.env[model].check_access('write') context['default_is_readonly_model'] = True context['default_model_id'] = self.env['ir.model']._get_id(model) first_valid_id = self.env[model].search([], limit=1).id context['default_resource_ref'] = f'{model},{first_valid_id}' return { 'name': _('Choose a record to link'), 'type': 'ir.actions.act_window', 'res_model': 'documents.link_to_record_wizard', 'view_mode': 'form', 'target': 'new', 'views': [(False, "form")], 'context': context, } def _send_access_by_mail(self, partners, message=""): """Send a notification email to contacts granted with a new document/folder access. :param dict[res.partner(), str] partners: Mapping of partner to new_role: 'edit', 'view'. :param message: message to add to the email """ self.ensure_one() subject = _('%s shared with you', self.display_name) if self.display_name else _('Access to a folder or a document') formatted_msg = Markup(message) if message else "" roles_info = { (role, lang): { "body": self.env['ir.qweb'].with_context(lang=lang)._render( 'documents.mail_template_document_share', {'record': self, 'user': self.env.user, 'message': formatted_msg} ), "role_label": _('Editor') if role == 'edit' else _('Viewer'), } for role, lang in {(role, partner.lang) for partner, role in partners.items()} } for partner, role in partners.items(): self.with_context(lang=partner.lang).message_notify( body=roles_info[role, partner.lang]['body'], email_layout_xmlid='mail.mail_notification_layout', partner_ids=partner.ids, subject=subject, subtitles=[self.display_name, _('Your Role: %s', roles_info[role, partner.lang]['role_label'])], ) def _notify_get_recipients_groups(self, message, model_description, msg_vals=None): groups = super()._notify_get_recipients_groups( message, model_description, msg_vals=msg_vals ) if len(self.ids) != 1: return groups group_values = { 'active': True, 'button_access': {'url': self.access_url}, 'has_button_access': True, } return [ ('group_documents_document_people_with_access', lambda pdata: (pdata['uid'] and self.with_user(pdata['uid']).user_permission != 'none') or (pdata['id'] and self.access_via_link != 'none' and self.access_ids.filtered(lambda a: a.partner_id.id == pdata['id'] and a.role)), group_values) ] + groups def get_deletion_delay(self): return int(self.env['ir.config_parameter'].sudo().get_param('documents.deletion_delay', '30')) def _get_is_multipage(self): """Whether the document can be considered multipage, if able to determine. :return: `None` if mimetype not handled, `False` if single page or error occurred, `True` otherwise. :rtype: bool | None """ if self.mimetype not in ('application/pdf', 'application/pdf;base64'): return None stream = io.BytesIO(base64.b64decode(self.datas)) try: return PdfFileReader(stream, strict=False).numPages > 1 except AttributeError: raise # If PyPDF's API changes and the `numPages` property isn't there anymore, not if its computation fails. except Exception: # noqa: BLE001 _logger.warning('Impossible to count pages in %r. It could be due to a malformed document or a ' '(possibly known) issue within PyPDF2.', self.name, exc_info=True) return False def _get_models(self, domain): """ Return the names of the models to which the attachments are attached. :param domain: the domain of the _read_group on documents. :return: a list of model data, the latter being a dict with the keys 'id' (technical name), 'name' (display name) and '__count' (how many attachments with that domain). """ not_a_file = [] not_attached = [] models = [] groups = self._read_group(domain, ['res_model'], ['__count']) for res_model, count in groups: if not res_model: not_a_file.append({ 'id': res_model, 'display_name': _('Not a file'), '__count': count, }) elif res_model == 'documents.document': not_attached.append({ 'id': res_model, 'display_name': _('Not attached'), '__count': count, }) else: models.append({ 'id': res_model, 'display_name': self.env['ir.model']._get(res_model).display_name, '__count': count, }) return sorted(models, key=lambda m: m['display_name']) + not_attached + not_a_file @api.depends('favorited_ids') @api.depends_context('uid') def _compute_is_favorited(self): favorited = self._filtered_access('read').filtered(lambda d: self.env.user in d.favorited_ids) favorited.is_favorited = True (self - favorited).is_favorited = False def _inverse_is_favorited(self): unfavorited_documents = favorited_documents = self.env['documents.document'].sudo() for document in self: if self.env.user in document.favorited_ids: unfavorited_documents |= document else: favorited_documents |= document favorited_documents.write({'favorited_ids': [(4, self.env.uid)]}) unfavorited_documents.write({'favorited_ids': [(3, self.env.uid)]}) @api.depends('res_model') def _compute_res_model_name(self): for record in self: if record.res_model: record.res_model_name = self.env['ir.model']._get(record.res_model).display_name else: record.res_model_name = False @api.constrains('url') def _check_url(self): for document in self.filtered("url"): if not document.url.startswith(('https://', 'http://', 'ftp://')): raise ValidationError(_('URL %s does not seem complete, as it does not begin with http(s):// or ftp://', document.url)) @api.model def message_new(self, msg_dict, custom_values=None): """When an email comes, create a document with the default values, then let `_message_post_after_hook` create one document per attachment.""" custom_values = custom_values or {} folder = self.env['documents.document'].browse(custom_values.get('folder_id')) custom_values['name'] = _('Mail: %s', msg_dict.get('subject')) if 'tag_ids' not in custom_values: custom_values['tag_ids'] = folder.alias_tag_ids.ids else: tags = custom_values['tag_ids'] if tags and isinstance(tags[0], list | tuple): # we have a list of m2m commands if all(len(t) >= 2 and t[0] == Command.LINK for t in tags): tags = [t[1] for t in tags] elif len(tags) == 1 and len(tags[0]) == 3 and tags[0][0] == Command.SET: tags = tags[0][2] else: # do not support other commands tags = [] custom_values['tag_ids'] = self.env['documents.tag'].browse(tags).exists().ids custom_values['active'] = False return super().message_new(msg_dict, custom_values) def _alias_get_creation_values(self): values = super()._alias_get_creation_values() values['alias_model_id'] = self.env['ir.model']._get('documents.document').id if self.id: values['alias_defaults'] = literal_eval(self.alias_defaults or "{}") values['alias_defaults'] |= {'folder_id': self.id} return values def _message_post_after_hook(self, message, msg_vals): """ If the res model was an attachment and a mail, adds all the custom values of the linked document settings to the attachments of the mail. """ m2m_commands = msg_vals['attachment_ids'] attachments = self.env['ir.attachment'].browse([x[1] for x in m2m_commands]) if (not self.env.context.get("no_document") or message.message_type == 'email') and attachments: self.attachment_id = False documents = self.env['documents.document'].create([{ 'name': attachment.name, 'attachment_id': attachment.id, 'folder_id': self.folder_id.id, 'owner_id': self.folder_id.owner_id.id or self.env.ref('base.user_root').id, 'partner_id': self.partner_id.id, 'tag_ids': self.tag_ids.ids, } for attachment in attachments]) for attachment, document in zip(attachments, documents): attachment.write({ 'res_model': 'documents.document', 'res_id': document.id, }) document.message_post( message_type='email', body=msg_vals.get('body', ''), email_from=msg_vals.get('email_from'), subject=msg_vals.get('subject') or self.name ) # Activity settings set through alias_defaults values has precedence over the activity folder settings if self.create_activity_option: document.documents_set_activity(settings_record=self) elif self.folder_id.create_activity_option: document.documents_set_activity(settings_record=self.folder_id) return super()._message_post_after_hook(message, msg_vals) def documents_set_activity(self, settings_record=None): """ Generate an activity based on the fields of settings_record. :param settings_record: the record that contains the activity fields. settings_record.create_activity_type_id (required) settings_record.create_activity_summary settings_record.create_activity_note settings_record.create_activity_date_deadline_range settings_record.create_activity_date_deadline_range_type settings_record.create_activity_user_id """ if settings_record and settings_record.create_activity_type_id: for record in self: activity_vals = { 'activity_type_id': settings_record.create_activity_type_id.id, 'summary': settings_record.create_activity_summary or '', 'note': settings_record.create_activity_note or '', } if settings_record.create_activity_date_deadline_range > 0: activity_vals['date_deadline'] = fields.Date.context_today(settings_record) + relativedelta( **{ settings_record.create_activity_date_deadline_range_type: settings_record.create_activity_date_deadline_range}) if settings_record._fields.get( 'create_has_owner_activity') and settings_record.create_has_owner_activity and record.owner_id: user = record.owner_id elif settings_record._fields.get('create_activity_user_id') and settings_record.create_activity_user_id: user = settings_record.create_activity_user_id elif settings_record._fields.get('user_id') and settings_record.user_id: user = settings_record.user_id else: user = self.env.user if user: activity_vals['user_id'] = user.id record.activity_schedule(**activity_vals) def copy_data(self, default=None): default = dict(default or {}) vals_list = super().copy_data(default=default) if 'name' not in default: for document, vals in zip(self, vals_list): vals['name'] = document.name if document.type == 'folder' else _("%s (copy)", document.name) for document, vals in zip(self, vals_list): # Avoid to propagate folder access as we want to copy the document accesses alone vals['access_ids'] = default.get('access_ids', False) if 'owner_id' not in vals: vals['owner_id'] = self.env.user.id return vals_list def copy(self, default=None): if not self: return self if not all(self.mapped('active')): raise UserError(_('You cannot duplicate document(s) in the Trash.')) # As we avoid to propagate the folder permission by setting access_ids to False (see copy_data), user has no # right to create the document. So after checking permission, we execute the copy in sudo. self.check_access("create") self.check_access('read') if not self.env.su and self.folder_id and self.folder_id.user_permission != 'edit': # do not check access to allow copying in root company folders raise AccessError(_('You cannot copy in that folder')) documents_order = {doc.id: idx for idx, doc in enumerate(self)} new_documents = [self.browse()] * len(self) skip_documents = self.env.context.get('documents_copy_folders_only') shortcuts = self.filtered('shortcut_document_id') if not skip_documents: for destination, targets in shortcuts.grouped('folder_id').items(): new_shortcuts = targets.action_create_shortcut(destination.id) for new_shortcut, target in zip(new_shortcuts, targets): new_documents[documents_order[target.id]] = new_shortcut folders = (self - shortcuts).filtered(lambda d: d.type == 'folder') if folders: embedded_actions = self._get_folder_embedded_actions(folders.ids) new_folders = folders.sudo()._copy_with_access(default=default).sudo(False) for old_folder, new_folder in zip(folders, new_folders): if folder_embedded_actions := embedded_actions.get(old_folder.id): embedded_actions_copies = folder_embedded_actions.copy() embedded_actions_copies.parent_res_id = new_folder.id # no need to check for permission as user is owner of copies (see copy_data). old_folder.children_ids.copy({'folder_id': new_folder.id}) new_documents[documents_order[old_folder.id]] = new_folder if not skip_documents and (documents_sudo := (self - shortcuts - folders).sudo()): new_binaries_sudo = documents_sudo._copy_with_access(default=default) for old_document_sudo, new_binary_sudo in zip(documents_sudo, new_binaries_sudo): new_documents[documents_order[old_document_sudo.id]] = new_binary_sudo.sudo(False) if to_copy_attachment_sudo := documents_sudo._copy_attachment_filter(default): new_attachments_iterator = iter(to_copy_attachment_sudo.attachment_id.with_context(no_document=True).copy()) for old_document_sudo, new_binary_sudo in zip(documents_sudo, new_binaries_sudo): if old_document_sudo._copy_attachment_filter(default): new_attachment = next(new_attachments_iterator) new_binary_sudo.write({ 'attachment_id': new_attachment.id, # Avoid recompute based on attachment_id 'name': new_binary_sudo.name, 'url_preview_image': False, }) return self.browse([new_document.id for new_document in new_documents]) def _copy_attachment_filter(self, default): if default and 'attachment_id' in default: return self.env['documents.document'] return self.filtered('attachment_id') def _copy_with_access(self, default): """Copy documents with their access. !Assumes that access rights were checked before! """ if not self: return self res = super().copy(default=default) if default and 'access_ids' in default: return res access_vals_list = [] for doc, doc_copied in zip(self, res): access_vals_list += doc.access_ids.filtered('role').copy_data(default={'document_id': doc_copied.id}) self.env['documents.access'].sudo().create(access_vals_list) return res def toggle_favorited(self): self.ensure_one() self.sudo().write({'favorited_ids': [(3 if self.env.user in self[0].favorited_ids else 4, self.env.user.id)]}) def access_content(self): self.ensure_one() action = { 'type': "ir.actions.act_url", 'target': "new", } if self.url: action['url'] = self.url elif self.type == 'binary': action['url'] = f'/documents/content/{self.access_token}' return action def open_resource(self): self.ensure_one() if self.res_model and self.res_id: view_id = self.env[self.res_model].get_formview_id(self.res_id) return { 'res_id': self.res_id, 'res_model': self.res_model, 'type': 'ir.actions.act_window', 'views': [[view_id, 'form']], } def toggle_lock(self): """ sets a lock user, the lock user is the user who locks a file for themselves, preventing data replacement and archive (therefore deletion) for any user but himself. Members of the group documents.group_documents_manager and the superuser can unlock the file regardless. """ self.ensure_one() if self.lock_uid: if self.env.user == self.lock_uid or self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager'): self.lock_uid = False else: self.lock_uid = self.env.uid @api.depends_context('uid') @api.depends('lock_uid') def _compute_is_locked(self): for record in self: record.is_locked = record.lock_uid and not ( self.env.user == record.lock_uid or self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager')) def action_archive(self): if not self: return to_archive_sudo = self.sudo().with_context(active_test=False).search([('id', 'child_of', self.ids)]) active_documents = to_archive_sudo.filtered(self._active_name).sudo(False) if not active_documents: return # As document archiving leads to deletion message = _("You do not have sufficient access rights to delete these documents.") try: active_documents.check_access('unlink') except UserError as e: raise AccessError(message) from e # As edit on parent is required to restore (and as removing a file is also somehow modifying the folder) if folder_ids := self.folder_id: if any(permission != 'edit' for permission in folder_ids.mapped('user_permission')): raise UserError(message) active_documents._raise_if_used_folder() deletion_date = fields.Date.to_string(fields.Date.today() + relativedelta(days=self.get_deletion_delay())) log_message = _("This file has been sent to the trash and will be deleted forever on the %s", deletion_date) active_documents._message_log_batch(bodies={doc.id: log_message for doc in active_documents}) return super(Document, active_documents).action_archive() def action_unarchive(self): self_archived = self.filtered(lambda d: not d.active) if not self_archived: return archived_top_parent_documents = self.env["documents.document"].sudo().search( expression.AND([ [('id', 'parent_of', self_archived.ids)], [('id', 'not in', self_archived.ids)], [('active', '=', False)], expression.OR([ [('folder_id', '=', False)], [('folder_id.active', '=', True)], ]) ]) ).sudo(False) if archived_top_parent_documents: raise UserError(_( "Item(s) you wish to restore are included in archived folders. " "To restore these items, you must restore the following including folders instead:\n" "- %(folders_list)s", folders_list="\n-".join(archived_top_parent_documents.mapped('name')))) # "Restricted" if not allowed # Leave archived children (and descendants) the current user doesn't have access to. to_unarchive_candidate_documents = self.env['documents.document'].with_context(active_test=False).search( [('id', 'child_of', self_archived.ids)]) seen_documents, to_unarchive_ids = set(), set() def add_if_can_be_restored(doc): if doc in seen_documents or seen_documents.add(doc): return doc.id in to_unarchive_ids if not doc.folder_id or doc.folder_id.sudo().active or add_if_can_be_restored(doc.folder_id): to_unarchive_ids.add(doc.id) return True return False for document in to_unarchive_candidate_documents: add_if_can_be_restored(document) to_unarchive_documents = to_unarchive_candidate_documents.filtered(lambda d: d.id in to_unarchive_ids) log_message = _("This document has been restored.") to_unarchive_documents._message_log_batch(bodies={doc.id: log_message for doc in to_unarchive_documents}) return super(Document, to_unarchive_documents).action_unarchive() @api.model_create_multi def create(self, vals_list): """Access rights fields (access_ids), access_internal, and access_via_link are inherited from containing folder unless specified in vals or context defaults. """ attachments = [] for vals in vals_list: keys = [key for key in vals if self._fields[key].related and self._fields[key].related.split('.')[0] == 'attachment_id'] attachment_dict = {key: vals.pop(key) for key in keys if key in vals} attachment = self.env['ir.attachment'].browse(vals.get('attachment_id')) if attachment and attachment_dict: attachment.write(attachment_dict) elif attachment_dict: attachment_dict.setdefault('name', vals.get('name', 'unnamed')) # default_res_model and default_res_id will cause unique constraints to trigger. attachment = self.env['ir.attachment'].with_context(clean_context(self.env.context)).create(attachment_dict) vals['attachment_id'] = attachment.id vals['name'] = vals.get('name', attachment.name) attachments.append(attachment) # don't allow using default_access_ids documents = super(Document, self.with_context(default_access_ids=None)).create(vals_list) is_manager = self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager') if not is_manager: if any(d.alias_name for d in documents): raise AccessError(_('Only Documents Managers can set aliases.')) if any(d.is_pinned_folder for d in documents): raise AccessError(_('Only Documents Managers can create in company folder.')) for document, attachment in zip(documents, attachments): if attachment and not attachment.res_id and ( not attachment.res_model or attachment.res_model == 'documents.document'): attachment.with_context(no_document=True).write({ 'res_model': 'documents.document', 'res_id': document.id}) return documents def _prepare_create_values(self, vals_list): old_vals_list = [vals.copy() for vals in vals_list] vals_list = super()._prepare_create_values(vals_list) folders = self.env['documents.document'].browse(v['folder_id'] for v in vals_list if v.get('folder_id')) users = self.env['res.users'].browse(v['owner_id'] for v in vals_list if v.get('owner_id')) folders.fetch(('access_internal', 'access_via_link', 'access_ids', 'active', 'owner_id')) (users | folders.owner_id).fetch(['partner_id']) odoobot = self.env.ref('base.user_root') vals_list_to_update_linked_record = [] for vals, old_vals in zip(vals_list, old_vals_list): owner = self.env['res.users'].browse(vals.get('owner_id', self.env.user.id)) owner_values = {'partner_id': owner.partner_id.id, 'role': False, 'last_access_date': fields.Datetime.now()} vals_values = { 'owner_id': owner.id, 'access_ids': [Command.create(owner_values)] if owner != odoobot else [] } if vals.get('folder_id'): folder = self.env['documents.document'].browse(vals['folder_id']) if not folder.active: raise UserError('It is not possible to create documents in an archived folder.') vals_values.update({ 'access_via_link': folder.access_via_link, 'access_internal': folder.access_internal, 'access_ids': vals_values['access_ids'] + [ Command.create({'partner_id': access.partner_id.id, 'role': access.role}) for access in folder.access_ids if access.role and access.partner_id != owner.partner_id ] + ([Command.create({'partner_id': folder.owner_id.partner_id.id, 'role': 'edit'})] if folder.owner_id != odoobot and folder.owner_id != owner else [] ), }) vals.update((k, v) for k, v in vals_values.items() if k not in old_vals) # If res_model and res_id are not set, we must get it from the related attachment if set (prepare list) if 'res_model' not in vals and 'res_id' not in vals and isinstance(vals.get('attachment_id'), int): vals_list_to_update_linked_record.append(vals) # For the next step, we need to ensure the related ref is present by getting it from attachment if needed if vals_list_to_update_linked_record: attachment_by_id = self.env['ir.attachment'].browse( [vals['attachment_id'] for vals in vals_list_to_update_linked_record]).grouped('id') for vals in vals_list_to_update_linked_record: attachment = attachment_by_id[vals['attachment_id']] vals['res_model'] = attachment.res_model vals['res_id'] = attachment.res_id # Delegate vals_list update to _prepare_create_values_for_model to add values depending on related record updated_vals_list = [] for res_model, model_vals_tuple_list in groupby(zip(vals_list, old_vals_list), lambda v: v[0].get('res_model')): updated_vals_list += self._prepare_create_values_for_model( res_model, [vals_tuple[0] for vals_tuple in model_vals_tuple_list], [vals_tuple[1] for vals_tuple in model_vals_tuple_list], ) return updated_vals_list def _prepare_create_values_for_model(self, res_model, vals_list, pre_vals_list): """Override to add values depending on related model/record""" if ( res_model and issubclass(self.pool[res_model], self.pool['documents.mixin']) and not self._context.get('no_document') ): return self.env[res_model]._prepare_document_create_values_for_linked_records( res_model, vals_list, pre_vals_list) return vals_list def write(self, vals): if 'shortcut_document_id' in vals: raise UserError(_("Shortcuts cannot change target document.")) is_manager = self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager') pinned_folders_start = self.filtered('is_pinned_folder') shortcuts_to_check_owner_target_access = self.browse() if (owner_id := vals.get('owner_id')) is not None: if not isinstance(owner_id, int): # recordset owner_id = owner_id.id if not is_manager and any(d.owner_id != self.env.user for d in self): raise AccessError(_("You cannot change the owner of documents you do not own.")) targets_changing_owner = self.filtered(lambda d: d.owner_id.id != owner_id) shortcuts_to_check_owner_target_access |= targets_changing_owner.shortcut_ids.filtered( lambda d: d.owner_id == d.shortcut_document_id.owner_id) if folder_id := vals.get('folder_id'): folder = self.env['documents.document'].browse(folder_id) if not self.env.su and folder.user_permission != 'edit': raise AccessError(_("You can't access that folder_id.")) if folder.type != 'folder' or folder.shortcut_document_id: raise UserError(_("Invalid folder id")) if any(not d.active or not folder.active or d.folder_id and not d.folder_id.active for d in self): raise UserError(_("It is not possible to move archived documents or documents to archived folders.")) if vals.get('active') is False: if self.env.user.share: raise UserError(_("You are not allowed to (un)archive documents.")) self.check_access('unlink') # As archived gc leads to unlink after `deletion_delay` days. attachment_id = vals.get('attachment_id') if attachment_id: self.ensure_one() attachments_was_present = [] for record in self: attachments_was_present.append(bool(record.attachment_id)) if record.type == 'binary' and not record.datas and ('datas' in vals or 'url' in vals): body = _("Document Request: %(name)s Uploaded by: %(user)s", name=record.name, user=self.env.user.name) record.with_context(no_document=True).message_post(body=body) if record.attachment_id: # versioning if attachment_id and attachment_id != record.attachment_id.id: # Link the new attachment to the related record and link the previous one # to the document. self.env["ir.attachment"].browse(attachment_id).with_context( no_document=True ).write( { "res_model": record.res_model or "documents.document", "res_id": record.res_id if record.res_model else record.id, } ) related_record = self.env[record.res_model].browse(record.res_id) if ( not hasattr(related_record, "message_main_attachment_id") or related_record.message_main_attachment_id != record.attachment_id ): record.attachment_id.with_context(no_document=True).write( {"res_model": "documents.document", "res_id": record.id} ) if attachment_id in record.previous_attachment_ids.ids: record.previous_attachment_ids = [(3, attachment_id, False)] record.previous_attachment_ids = [(4, record.attachment_id.id, False)] if 'datas' in vals: old_attachment = record.attachment_id.with_context(no_document=True).copy() # removes the link between the old attachment and the record. old_attachment.write({ 'res_model': 'documents.document', 'res_id': record.id, }) record.previous_attachment_ids = [(4, old_attachment.id, False)] elif vals.get('datas') and not vals.get('attachment_id'): res_model = vals.get('res_model', record.res_model or 'documents.document') res_id = vals.get('res_id') if vals.get('res_model') else record.res_id if record.res_model else record.id if res_model and res_model != 'documents.document' and not self.env[res_model].browse(res_id).exists(): record.res_model = res_model = 'documents.document' record.res_id = res_id = record.id attachment = self.env['ir.attachment'].with_context(no_document=True).create({ 'name': vals.get('name', record.name), 'res_model': res_model, 'res_id': res_id }) record.attachment_id = attachment.id # pops the datas and/or the mimetype key(s) to explicitly write them in batch on the ir.attachment # so the mimetype is properly set. The reason was because the related keys are not written in batch # and because mimetype is readonly on `ir.attachment` (which prevents writing through the related). attachment_dict = {key: vals.pop(key) for key in ['datas', 'mimetype'] if key in vals} if not is_manager and set(vals) & set(self.env['mail.alias.mixin']._fields): raise AccessError(_('Only Documents managers can set an alias.')) write_result = super().write(vals) if attachment_dict: self.mapped('attachment_id').write(attachment_dict) if 'attachment_id' in vals: self.attachment_id.check('read') if (new_active := vals.get('active')) is not None: if not new_active and self.sudo().search([('id', 'child_of', self.ids), ('active', '=', True)]): raise UserError(_('Operation not supported. Please use "Move to Trash" / `action_archive` instead.')) if new_active and self.sudo().search([('id', 'parent_of', self.ids), ('active', '=', False)]): raise UserError(_('Operation not supported. Please use "Restore" / `action_unarchive` instead.')) if not is_manager and self.filtered('is_pinned_folder') != pinned_folders_start: raise AccessError(_("Only Documents Managers can create in company folder.")) for document, attachment_was_present in zip(self, attachments_was_present): if document.request_activity_id and document.attachment_id and not attachment_was_present: feedback = _("Document Request: %(name)s Uploaded by: %(user)s", name=self.name, user=self.env.user.name) document.with_context(no_document=True).request_activity_id.action_feedback( feedback=feedback, attachment_ids=[document.attachment_id.id]) if (company_id := vals.get('company_id')) and self.shortcut_ids: # no need if resetting company_id to False self.shortcut_ids.sudo().write({'company_id': company_id}) if shortcuts_to_check_owner_target_access: shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible() return write_result @api.model def _pdf_split(self, new_files=None, open_files=None, vals=None): vals = vals or {} new_attachments = self.env['ir.attachment']._pdf_split(new_files=new_files, open_files=open_files) new_documents = self.create([ dict(vals, attachment_id=attachment.id) for attachment in new_attachments ]) # Prevent concurrent update error on accessing these documents for the first time on exiting the split tool env_partner = self.env.user.partner_id documents_not_member = new_documents.filtered(lambda d: env_partner not in d.access_ids.partner_id) self.env['documents.access'].sudo().create([ {'document_id': doc.id, 'partner_id': env_partner.id, 'last_access_date': fields.Datetime.now()} for doc in documents_not_member ]) return new_documents @api.model def search_panel_select_range(self, field_name, **kwargs): if field_name == 'folder_id': enable_counters = kwargs.get('enable_counters', False) search_panel_fields = ['access_token', 'company_id', 'description', 'display_name', 'folder_id', 'is_favorited', 'is_pinned_folder', 'owner_id', 'shortcut_document_id', 'user_permission'] domain = [('type', '=', 'folder')] if unique_folder_id := self.env.context.get('documents_unique_folder_id'): values = self.env['documents.document'].search_read( expression.AND([domain, [('folder_id', 'child_of', unique_folder_id)]]), search_panel_fields, load=False, ) accessible_folder_ids = {rec['id'] for rec in values} for record in values: if record['folder_id'] not in accessible_folder_ids: record['folder_id'] = False # consider them as roots return { 'parent_field': 'folder_id', 'values': values, } records = self.env['documents.document'].search_read(domain, search_panel_fields) accessible_folder_ids = {rec['id'] for rec in records} domain_image = {} if enable_counters: model_domain = expression.AND([ kwargs.get('search_domain', []), kwargs.get('category_domain', []), kwargs.get('filter_domain', []), [(field_name, '!=', False)] ]) domain_image = self._search_panel_domain_image(field_name, model_domain, enable_counters) values_range = OrderedDict() shared_root_id = "SHARED" if not self.env.user.share else False for record in records: record_id = record['id'] if enable_counters: image_element = domain_image.get(record_id) record['__count'] = image_element['__count'] if image_element else 0 folder_id = record['folder_id'] if folder_id: folder_id = folder_id[0] if folder_id not in accessible_folder_ids: if record['shortcut_document_id']: continue folder_id = shared_root_id elif record['owner_id'][0] == self.env.user.id: folder_id = "MY" elif record['owner_id'][0] != self.env.ref('base.user_root').id or self.env.user.share: if record['shortcut_document_id']: continue folder_id = shared_root_id else: folder_id = "COMPANY" record['folder_id'] = folder_id values_range[record_id] = record if enable_counters: self._search_panel_global_counters(values_range, 'folder_id') special_roots = [] if not self.env.user.share: special_roots = [ {'bold': True, 'childrenIds': [], 'parentId': False, 'user_permission': 'edit'} | values for values in [ { 'display_name': _("Company"), 'id': 'COMPANY', 'description': _("Common roots for all company users."), 'user_permission': 'view', }, { 'display_name': _("My Drive"), 'id': 'MY', 'user_permission': 'edit', 'description': _("Your individual space."), }, { 'display_name': _("Shared with me"), 'id': 'SHARED', 'description': _("Additional documents you have access to."), }, { 'display_name': _("Recent"), 'id': 'RECENT', 'description': _("Recently accessed documents."), }, { 'display_name': _("Trash"), 'id': 'TRASH', 'description': _("Items in trash will be deleted forever after %s days.", self.get_deletion_delay()), }] ] return { 'parent_field': 'folder_id', 'values': list(values_range.values()) + special_roots, } return super().search_panel_select_range(field_name) @api.model def get_document_max_upload_limit(self): ICP = self.env['ir.config_parameter'].sudo() for key in ('document.max_fileupload_size', 'web.max_file_upload_size'): value = ICP.get_param(key, default=None) if value is None: continue try: return int(value) or None except ValueError: _logger.error("invalid %s: %r", key, value) return odoo.http.DEFAULT_MAX_CONTENT_LENGTH @api.model def can_upload_traceback(self): return self.env.user._is_internal and \ bool(self.env.ref('documents.document_support_folder', raise_if_not_found=False)) def unlink(self): """Clean unused linked records too. This applies to: * Children documents when deleting the parent folder * Parent folder if it is archived and has no other children and user is allowed to * Attachment if document-related record is deleted """ to_delete = self.sudo().with_context(active_test=False).search([('id', 'child_of', self.ids)]).sudo(False) removable_parent_folders = self.with_context(active_test=False).folder_id.filtered( lambda folder: len(folder.children_ids - self) == 0 and not folder.active) removable_attachments = self.filtered(lambda d: d.res_model != d._name).attachment_id res = super(Document, to_delete).unlink() if removable_attachments: removable_attachments.unlink() if removable_parent_folders: with contextlib.suppress(AccessError): removable_parent_folders.unlink() return res @api.ondelete(at_uninstall=False) def _unlink_except_unauthorized(self): try: self.check_access('unlink') except UserError as e: # Hide potentially unknown inaccessible content's name. raise UserError(_("You are not allowed to delete all these items.")) from e @api.ondelete(at_uninstall=False) def _unlink_except_company_folders(self): self._raise_if_used_folder() def _raise_if_used_folder(self): if folder_ids := self.filtered(lambda d: d.type == 'folder').ids: company_field_names = [ company_field_name for company_field_name, field in self.env['res.company']._fields.items() if field.comodel_name == "documents.document" ] if self.env['res.company'].search_count(expression.OR([ [(field_name, 'in', folder_ids)] for field_name in company_field_names ]), limit=1): raise ValidationError(_("Impossible to delete folders used by other applications.")) def _unlink_shortcut_if_target_inaccessible(self): """As a fix in stable version, delete shortcuts when target is no longer accessible to the owner.""" for owner, shortcuts in self.filtered('shortcut_document_id').grouped("owner_id").items(): shortcuts_as_owner_sudo = shortcuts.with_user(owner).sudo() shortcuts_as_owner_sudo.filtered(lambda d: d.shortcut_document_id.user_permission == 'none').unlink() @api.autovacuum def _gc_clear_bin(self): """Files are deleted automatically from the trash bin after the configured remaining days.""" deletion_delay = self.get_deletion_delay() self.search([ ('active', '=', False), ('write_date', '<=', fields.Datetime.now() - relativedelta(days=deletion_delay)), ], limit=1000).unlink() def _get_access_action(self, access_uid=None, force_website=False): self.ensure_one() if access_uid and not force_website and self.active and self.env.user.has_group("documents.group_documents_user"): url_params = url_encode({ 'preview_id': self.id, 'view_id': self.env.ref("documents.document_view_kanban").id, 'menu_id': self.env.ref("documents.menu_root").id, 'folder_id': self.folder_id.id, }) return { "type": "ir.actions.act_url", "url": f"/odoo/action-documents.document_action?{url_params}" } return super()._get_access_action(access_uid=access_uid, force_website=force_website) @api.readonly def permission_panel_data(self): """Provide access related data for a given document/folder""" specification = self._permission_specification() self.check_access('read') result = self.sudo().with_context(active_test=False).web_search_read([('id', '=', self.id)], specification) record = result['records'][0] selections = {'access_via_link': self._fields.get('access_via_link').selection} if self.env.user.has_group('base.group_user'): record['access_ids'] = [a for a in record['access_ids'] if a['role']] if record['owner_id']['id'] == self.env.ref('base.user_root').id: record['owner_id'] = False # Only a real user should be shown in the panel selections.update({ 'access_internal': self._fields.get('access_internal').selection, 'doc_access_roles': self.env['documents.access']._fields.get('role').selection, }) return {'record': record, 'selections': selections} def _permission_specification(self): specification = { 'access_internal': {}, 'access_via_link': {}, 'access_url': {}, 'active': {}, 'display_name': {}, 'folder_id': {}, 'is_access_via_link_hidden': {}, 'type': {}, 'user_permission': {}, } if self.env.user.has_group('base.group_user'): partner_id_spec = {'fields': {'email': {}, 'name': {}, 'user_id': {}}} specification.update({ 'access_ids': { 'fields': { 'document_id': {}, 'partner_id': partner_id_spec, 'role': {}, 'expiration_date': {}, }, }, 'owner_id': { 'fields': { 'partner_id': partner_id_spec, }, } }) return specification