odoo18/addons_extensions/documents/models/documents_document.py

2061 lines
100 KiB
Python

# -*- coding: utf-8 -*-
import base64
import contextlib
import io
import logging
import re
import uuid
from ast import literal_eval
from collections import Counter, OrderedDict, defaultdict
import requests
from dateutil.relativedelta import relativedelta
from markupsafe import Markup
from werkzeug.urls import url_encode
import odoo
from odoo import _, api, Command, fields, models
from odoo.exceptions import AccessError, UserError, ValidationError
from odoo.osv import expression
from odoo.tools import groupby, image_process, SQL, create_index
from odoo.tools.mimetypes import get_extension
from odoo.tools.misc import clean_context
from odoo.tools.pdf import PdfFileReader
from odoo.addons.mail.tools import link_preview
_logger = logging.getLogger(__name__)
def _sanitize_file_extension(extension):
""" Remove leading and trailing spacing + Remove leading "." """
return re.sub(r'^[\s.]+|\s+$', '', extension)
class Document(models.Model):
_name = 'documents.document'
_description = 'Document'
_inherit = ['mail.thread.cc', 'mail.activity.mixin', 'mail.alias.mixin']
_order = 'id desc'
_parent_name = 'folder_id'
_parent_store = True
_systray_view = 'activity'
# Attachment
attachment_id = fields.Many2one('ir.attachment', ondelete='cascade', auto_join=True, copy=False)
attachment_name = fields.Char('Attachment Name', related='attachment_id.name', readonly=False)
attachment_type = fields.Selection(string='Attachment Type', related='attachment_id.type', readonly=False)
is_editable_attachment = fields.Boolean(default=False, help='True if we can edit the link attachment.')
is_multipage = fields.Boolean(
'Is considered multipage', compute='_compute_is_multipage', store=True, readonly=False)
datas = fields.Binary(related='attachment_id.datas', related_sudo=True, readonly=False, prefetch=False)
raw = fields.Binary(related='attachment_id.raw', related_sudo=True, readonly=False, prefetch=False)
file_extension = fields.Char('File Extension', copy=True, store=True, readonly=False,
compute='_compute_file_extension', inverse='_inverse_file_extension')
file_size = fields.Integer(related='attachment_id.file_size', store=True)
checksum = fields.Char(related='attachment_id.checksum')
mimetype = fields.Char(related='attachment_id.mimetype')
res_model = fields.Char('Resource Model', compute="_compute_res_record", recursive=True,
inverse="_inverse_res_model", store=True)
res_id = fields.Many2oneReference('Resource ID', compute="_compute_res_record", recursive=True,
inverse="_inverse_res_model", store=True, model_field="res_model")
res_name = fields.Char('Resource Name', compute="_compute_res_name", compute_sudo=True)
index_content = fields.Text(related='attachment_id.index_content')
description = fields.Text('Attachment Description', related='attachment_id.description', readonly=False)
# Versioning
previous_attachment_ids = fields.Many2many('ir.attachment', string="History")
# Document
name = fields.Char('Name', copy=True, store=True, compute='_compute_name_and_preview', readonly=False)
active = fields.Boolean(default=True, string="Active")
thumbnail = fields.Binary(
readonly=False, store=True, attachment=True, compute='_compute_thumbnail', recursive=True)
thumbnail_status = fields.Selection([
('present', 'Present'), # Document has a thumbnail
('error', 'Error'), # Error when generating the thumbnail
('client_generated', 'Client Generated'), # The PDF thumbnail is generated by the user browser
('restricted', 'Inaccessible'), # Shortcut to no-permission source
], compute="_compute_thumbnail", store=True, readonly=False, recursive=True,
)
url = fields.Char('Link URL', index=True, size=1024, tracking=True)
url_preview_image = fields.Char(
'URL Preview Image', store=True, compute='_compute_name_and_preview', readonly=False)
res_model_name = fields.Char(compute='_compute_res_model_name', index=True)
type = fields.Selection([('url', 'URL'), ('binary', 'File'), ('folder', 'Folder')],
default='binary', string='Type', required=True, readonly=True)
shortcut_document_id = fields.Many2one('documents.document', 'Source Document', ondelete='cascade',
index='btree_not_null')
shortcut_ids = fields.One2many('documents.document', 'shortcut_document_id')
favorited_ids = fields.Many2many('res.users', string="Favorite of")
is_favorited = fields.Boolean(compute='_compute_is_favorited', inverse='_inverse_is_favorited')
tag_ids = fields.Many2many('documents.tag', 'document_tag_rel', string="Tags")
partner_id = fields.Many2one('res.partner', string="Contact", tracking=True)
owner_id = fields.Many2one('res.users', default=lambda self: self.env.user.id, string="Owner",
required=True, tracking=True, index=True)
lock_uid = fields.Many2one('res.users', string="Locked by")
is_locked = fields.Boolean(compute="_compute_is_locked", string="Locked")
request_activity_id = fields.Many2one('mail.activity')
requestee_partner_id = fields.Many2one('res.partner')
# Access
document_token = fields.Char(
required=True, copy=False, index='btree',
default=lambda __: base64.urlsafe_b64encode(uuid.uuid4().bytes).decode().removesuffix('=='))
access_token = fields.Char(compute='_compute_access_token')
access_url = fields.Char(string='Access url', compute='_compute_access_url')
is_access_via_link_hidden = fields.Boolean(
'Link Access Hidden', index=True,
help='If "True", only people given direct access to this document will be able to view it. '
'If "False", access with the link also given to all who can access the parent folder.'
)
access_via_link = fields.Selection(
[('view', 'Viewer'), ('edit', 'Editor'), ('none', 'None')],
string='Link Access Rights', default='none', required=True, index=True)
access_internal = fields.Selection(
[('view', 'Viewer'), ('edit', 'Editor'), ('none', 'None')],
string='Internal Users Rights', default='none', required=True, index=True)
access_ids = fields.One2many('documents.access', 'document_id', string="Allowed Access")
user_permission = fields.Selection(
[('edit', 'Editor'), ('view', 'Viewer'), ('none', 'None')], string='User permission',
compute='_compute_user_permission', search='_search_user_permission', compute_sudo=True)
# Folder = parent document
parent_path = fields.Char(index=True) # see '_parent_store' implementation in the ORM for details
folder_id = fields.Many2one('documents.document', string="Parent Folder", ondelete="set null", tracking=True,
domain="[('type', '=', 'folder'), ('shortcut_document_id', '=', False)]",
required=False, index=True)
children_ids = fields.One2many('documents.document', 'folder_id')
deletion_delay = fields.Integer("Deletion delay", compute="_compute_deletion_delay",
help="Delay after permanent deletion of the document in the trash (days)")
company_id = fields.Many2one('res.company', string='Company', store=True, readonly=False, index=True)
# TODO: remove in master
is_pinned_folder = fields.Boolean("Pinned to Company roots", compute='_compute_is_pinned_folder', store=True)
# Stat buttons
document_count = fields.Integer('Document Count', compute='_compute_document_count')
# Activity
create_activity_option = fields.Boolean(string='Create a new activity')
create_activity_type_id = fields.Many2one('mail.activity.type', string="Activity type")
create_activity_summary = fields.Char('Summary')
create_activity_date_deadline_range = fields.Integer(string='Due Date In')
create_activity_date_deadline_range_type = fields.Selection([
('days', 'Days'),
('weeks', 'Weeks'),
('months', 'Months'),
], string='Due type', default='days')
create_activity_note = fields.Html(string="Note")
create_activity_user_id = fields.Many2one('res.users', string='Responsible')
# Actions that we can do on the document
available_embedded_actions_ids = fields.Many2many(
'ir.embedded.actions', 'Available Actions', compute='_compute_available_embedded_actions_ids',
groups='base.group_user')
# Alias
alias_tag_ids = fields.Many2many('documents.tag', 'document_alias_tag_rel', string="Alias Tags")
# UI fields
last_access_date_group = fields.Selection(selection=[
('0_older', 'Older'),
('1_month', 'This Month'),
('2_week', 'This Week'),
('3_day', 'Today'),
], string="Last Accessed On", compute='_compute_last_access_date_group', search='_search_last_access_date_group')
_sql_constraints = [
('attachment_unique', 'unique (attachment_id)', "This attachment is already a document"),
('document_token_unique', 'unique (document_token)', "Access tokens already used."),
('folder_id_not_id', 'check(folder_id <> id)', "A folder cannot be included in itself"),
('shortcut_document_id_not_id', 'check(shortcut_document_id <> id)', "A shortcut cannot point to itself"),
]
def init(self):
super().init()
create_index(self.env.cr,
indexname='documents_document_res_model_res_id_idx',
tablename=self._table,
expressions=['res_model', 'res_id'])
@api.depends('document_token')
def _compute_access_token(self):
for document in self:
document.access_token = f"{document.document_token}o{document.id or 0:x}"
@api.depends('access_token')
def _compute_access_url(self):
for document in self:
document.access_url = f'{document.sudo().get_base_url()}/odoo/documents/{document.access_token}'
@api.depends("folder_id", "company_id")
@api.depends_context("uid", "allowed_company_ids")
def _compute_display_name(self):
accessible_records = self._filtered_access('read')
not_accessible_records = self - accessible_records
not_accessible_records.display_name = _("Restricted")
folders = accessible_records.filtered(lambda d: d.type == 'folder')
for record in folders:
if record.user_permission != 'none':
record.display_name = record.name
else:
record.display_name = _("Restricted Folder")
for record in accessible_records - folders:
record.display_name = record.name
@api.depends('name', 'type')
def _compute_file_extension(self):
for record in self:
if record.type != 'binary':
record.file_extension = False
elif record.name:
record.file_extension = _sanitize_file_extension(get_extension(record.name.strip())) or False
def _inverse_file_extension(self):
for record in self:
record.file_extension = _sanitize_file_extension(record.file_extension) if record.file_extension else False
@api.constrains('shortcut_document_id', 'shortcut_ids', 'type', 'folder_id', 'children_ids', 'company_id')
def _check_shortcut_fields(self):
errors = []
wrong_types, wrong_companies = self.browse(), self.browse()
# Access rights can allow for a document to be edited without having access to its parent folder
wrong_parents_sudo = self.folder_id.sudo().filtered('shortcut_document_id')
for target in self.filtered('shortcut_ids'):
for shortcut in target.shortcut_ids:
if shortcut.type != target.type:
wrong_types |= shortcut
for shortcut in self.filtered('shortcut_document_id'):
if shortcut.type != shortcut.shortcut_document_id.type:
wrong_types |= shortcut
if shortcut.children_ids:
wrong_parents_sudo |= shortcut
if (shortcut.shortcut_document_id.company_id
and shortcut.shortcut_document_id.company_id != shortcut.company_id):
wrong_companies |= shortcut
if wrong_types:
message = _("The following documents/shortcuts have a type mismatch: \n")
documents_list = "\n- ".join(wrong_types.mapped('name'))
errors.append(f'{message}\n- {documents_list}')
if wrong_parents_sudo:
message = _("The following shortcuts cannot be set as documents parents: \n")
shortcuts_list = "\n- ".join(wrong_parents_sudo.mapped('name'))
errors.append(f'{message}\n- {shortcuts_list}')
if wrong_companies:
message = _("The following documents/shortcuts have a company mismatch: \n")
shortcuts_list = "\n- ".join(wrong_companies.mapped('name'))
errors.append(f'{message}\n- {shortcuts_list}')
if errors:
raise ValidationError('\n\n'.join(errors))
@api.constrains('owner_id', 'folder_id')
def _check_portal_cant_own_root_document(self):
wrong_records = self.filtered(lambda d: d.owner_id.sudo().share and not self.folder_id)
if wrong_records:
raise ValidationError(_(
"The following documents/folders can't be owned by a Portal User: \n- %(partners)s",
partners="\n-".join(wrong_records.mapped('name'))))
@api.constrains('type', 'alias_name')
def _check_alias(self):
wrong_records = self.filtered(lambda d: (d.type != 'folder' or d.shortcut_document_id) and d.alias_name)
if wrong_records:
raise ValidationError(_(
"The following documents can't have alias: \n- %(records)s",
records="\n-".join(wrong_records.mapped('name'))))
@api.depends('folder_id', 'owner_id', 'type')
def _compute_is_pinned_folder(self):
# TODO: remove in master, for stable force the field to reflect owner / folder value
# because it's used in access rule `documents_document_write_base_rule`
for document in self:
document.is_pinned_folder = (
document.type == 'folder'
and not document.folder_id
and document.owner_id == self.env.ref('base.user_root')
)
@api.depends('attachment_id', 'url', 'shortcut_document_id')
def _compute_name_and_preview(self):
request_session = requests.Session()
shortcuts = self.filtered('shortcut_document_id')
for record in self - shortcuts:
if record.attachment_id:
record.name = record.attachment_id.name
record.url_preview_image = False
elif record.url:
preview = link_preview.get_link_preview_from_url(record.url, request_session)
if not preview:
continue
if preview.get('og_title'):
record.name = preview['og_title']
if preview.get('og_image'):
record.url_preview_image = preview['og_image']
for shortcut in shortcuts:
shortcut.name = shortcut.name or shortcut.shortcut_document_id.name
shortcut.url_preview_image = shortcut.url_preview_image or shortcut.shortcut_document_id.url_preview_image
@api.depends_context('uid', 'allowed_company_ids')
@api.depends('access_ids', 'access_internal', 'access_via_link', 'owner_id', 'is_access_via_link_hidden',
'company_id', 'folder_id.access_ids', 'folder_id.access_internal', 'folder_id.access_via_link',
'folder_id.owner_id', 'folder_id.company_id')
def _compute_user_permission(self):
for document in self:
if self.env.user.has_group('documents.group_documents_system'):
document.user_permission = (
'edit' if not (company := document.company_id)
or company in self.env.companies or company not in self.env.user.company_ids
else 'none')
continue
document.user_permission = document._get_permission_without_token()
if document.user_permission == 'view' and document.access_via_link == 'edit':
document.user_permission = 'edit'
elif document.user_permission == 'none' and document.folder_id and document.access_via_link != 'none' \
and not document.is_access_via_link_hidden:
# If the user can access the parent, they have the link.
# This only works one level up, as it mimics accessing through the interface.
with contextlib.suppress(AccessError):
if document.folder_id._get_permission_without_token() != 'none':
document.user_permission = document.access_via_link
def _get_permission_without_token(self):
self.ensure_one()
is_user_company = self.company_id and self.company_id in self.env.user.company_ids
is_disabled_company = is_user_company and self.company_id not in self.env.companies
if is_disabled_company:
return 'none'
# own documents
if self.owner_id == self.env.user:
return 'edit'
user_permission = 'none'
# access with <documents.access>
if access := self.access_ids.filtered(
lambda a: a.partner_id == self.env.user.partner_id
and (not a.expiration_date or a.expiration_date > fields.Datetime.now())
):
user_permission = access.role or self.access_via_link
# access as internal
if not self.env.user.share and user_permission != "edit" and self.access_internal != 'none':
if not self.company_id or self.company_id in self.env.companies:
user_permission = (
'edit' if self.env.user.has_group('documents.group_documents_manager')
else self.access_internal
)
return user_permission
def _search_user_permission(self, operator, value):
if operator not in ('=', '!='):
raise NotImplementedError("Unsupported search operator")
if self.env.user._is_public() or value not in {'view', 'edit', 'none'}:
return expression.FALSE_DOMAIN
if (operator, value) in (("=", "edit"), ("!=", "view")): # access but no view => edit
searched_roles = ['edit']
elif (operator, value) in (('=', 'view'), ("!=", "edit")): # access without edit => view
searched_roles = ['view']
elif (operator, value) == ("!=", "none"): # any access
searched_roles = ['edit', 'view']
else:
return expression.FALSE_DOMAIN # ("=", "none") = not allowed, so no records
other_company = [('company_id', '!=', False), ('company_id', 'not in', self.env.user.company_ids.ids)]
allowed_or_no_company = [('company_id', 'in', [False] + self.env.companies.ids)]
any_except_disabled_company = expression.OR([
[('company_id', 'in', self.env.companies.ids)], [('company_id', 'not in', self.env.user.company_ids.ids)]
])
if self.env.user.has_group('documents.group_documents_system'):
if searched_roles == ['view']:
return expression.FALSE_DOMAIN # System Administrator has "edit" on all documents, so finds none with "view" only.
return any_except_disabled_company
# Access from membership
if searched_roles == ['view']:
access_level_domain = expression.OR([
[('role', '=', 'view'), ('document_id.access_via_link', 'in', ('none', 'view'))],
[('role', '=', False), ('document_id.access_via_link', '=', 'view')],
])
elif searched_roles == ['edit']:
access_level_domain = expression.OR([
[('role', '=', 'edit')], [('document_id.access_via_link', '=', 'edit')]
])
else:
access_level_domain = expression.OR([
[('role', 'in', ('view', 'edit'))], [('document_id.access_via_link', '!=', 'none')]
])
access_domain = [('access_ids', 'any', expression.AND([
access_level_domain,
expression.AND([
[('partner_id', '=', self.env.user.partner_id.id)],
['|', ('expiration_date', '=', False), ('expiration_date', '>', fields.Datetime.now())],
]),
]))]
# Access from ownership
owner_domain = [('owner_id', '=', self.env.user.id)]
direct_domain = expression.AND([
any_except_disabled_company,
access_domain if 'edit' not in searched_roles else expression.OR([access_domain, owner_domain]),
])
# Access form access_internal
if self.env.user.has_group('documents.group_documents_manager'):
if searched_roles == ['view']:
direct_domain = expression.AND([
direct_domain,
expression.OR([[('access_internal', '=', 'none')], other_company])
])
else:
direct_domain = expression.OR([
direct_domain,
expression.AND([[('access_internal', 'in', ('view', 'edit'))], allowed_or_no_company]),
])
elif not self.env.user.share:
if searched_roles == ['view']:
internal_domain = [('access_internal', '=', 'view'), ('access_via_link', 'in', ('none', 'view'))]
elif searched_roles == ['edit']:
internal_domain = expression.OR([
[('access_internal', '=', 'edit')],
expression.AND([[('access_internal', '=', 'view')], [('access_via_link', '=', 'edit')]]),
])
else:
internal_domain = [('access_internal', 'in', ('view', 'edit'))]
direct_domain = expression.OR([direct_domain, expression.AND([internal_domain, allowed_or_no_company])])
# Look one level up for links unless hidden
link_via_parent_domain = expression.AND([
[('access_via_link', 'in', searched_roles)],
[('is_access_via_link_hidden', '=', False)],
[('folder_id', 'any', direct_domain)],
])
return expression.OR([direct_domain, link_via_parent_domain])
@api.depends('datas', 'mimetype')
def _compute_is_multipage(self):
for document in self:
# external computation to be extended
document.is_multipage = bool(document._get_is_multipage()) # None => False
@api.depends('attachment_id', 'attachment_id.res_model', 'attachment_id.res_id',
'shortcut_document_id.res_model', 'shortcut_document_id.res_id')
def _compute_res_record(self):
for record in self:
attachment = record.attachment_id
if attachment:
record.res_model = attachment.res_model
record.res_id = attachment.res_id
if record.shortcut_document_id:
record.res_model = record.shortcut_document_id.res_model
record.res_id = record.shortcut_document_id.res_id
@api.depends('attachment_id', 'res_model', 'res_id')
def _compute_res_name(self):
for record in self:
if record.attachment_id:
record.res_name = record.attachment_id.res_name
elif record.res_id and record.res_model:
record.res_name = self.env[record.res_model].browse(record.res_id).display_name
else:
record.res_name = False
def _inverse_res_model(self):
for record in self:
attachment = record.attachment_id.with_context(no_document=True)
if attachment:
# Avoid inconsistency in the data, write both at the same time.
# In case a check_access is done between res_id and res_model modification,
# an access error can be received. (Mail causes this check_access)
attachment.sudo().write({'res_model': record.res_model, 'res_id': record.res_id})
@api.depends('checksum', 'shortcut_document_id.thumbnail', 'shortcut_document_id.thumbnail_status',
'shortcut_document_id.user_permission')
def _compute_thumbnail(self):
for document in self:
if document.shortcut_document_id:
if document.shortcut_document_id.user_permission != 'none':
document.thumbnail = document.shortcut_document_id.thumbnail
document.thumbnail_status = document.shortcut_document_id.thumbnail_status
else:
document.thumbnail = False
document.thumbnail_status = 'restricted'
elif document.mimetype and document.mimetype.startswith('application/pdf'):
# Thumbnails of pdfs are generated by the client. To force the generation, we invalidate the thumbnail.
document.thumbnail = False
document.thumbnail_status = 'client_generated'
elif document.mimetype and document.mimetype.startswith('image/'):
try:
document.thumbnail = base64.b64encode(image_process(document.raw, size=(200, 140), crop='center'))
document.thumbnail_status = 'present'
except (UserError, TypeError):
document.thumbnail = False
document.thumbnail_status = 'error'
else:
document.thumbnail = False
document.thumbnail_status = False
@api.depends('type')
def _compute_deletion_delay(self):
folders = self.filtered(lambda d: d.type == 'folder')
folders.deletion_delay = self.get_deletion_delay()
(self - folders).deletion_delay = False
@api.depends_context('uid')
@api.depends('type', 'children_ids', 'shortcut_document_id')
def _compute_document_count(self):
folders = (self | self.shortcut_document_id).filtered(
lambda d: d.type == 'folder' and not d.shortcut_document_id)
children_counts = Counter(dict(self._read_group(
[('folder_id', 'in', folders.ids)],
groupby=['folder_id'],
aggregates=['__count'])))
for doc in self:
doc.document_count = children_counts[doc.shortcut_document_id or doc]
def _get_folder_embedded_actions(self, folder_ids):
"""Return the enabled actions for the given folder."""
embedded_actions = self.env['ir.embedded.actions'].sudo().search(
domain=[
('parent_action_id', '=', self.env.ref("documents.document_action").id),
('action_id', '!=', False),
('action_id.type', '=', 'ir.actions.server'),
('parent_res_model', '=', 'documents.document'),
('parent_res_id', 'in', folder_ids),
('groups_ids', 'in', [False] + self.env.user.groups_id.ids),
],
order='sequence',
)
# group after ordering by `ir.embedded.actions` sequence
return embedded_actions.grouped('parent_res_id')
@api.depends_context('uid')
@api.depends('folder_id')
def _compute_available_embedded_actions_ids(self):
embedded_actions = self._get_folder_embedded_actions(self.folder_id.ids)
embedded_actions_per_folder = {
folder_id: actions.ids
for folder_id, actions in embedded_actions.items()
}
self.available_embedded_actions_ids = False
for document in self.filtered(lambda d: d.type != 'folder' and not d.shortcut_document_id):
document.available_embedded_actions_ids = embedded_actions_per_folder.get(document.folder_id.id, False)
def _get_last_access_date_group_cte(self):
return SQL("""
WITH last_access_date AS (
SELECT (CASE
WHEN last_access_date > NOW() - INTERVAL '1 days' THEN '3_day'
WHEN last_access_date > NOW() - INTERVAL '7 days' THEN '2_week'
WHEN last_access_date > NOW() - INTERVAL '1 months' THEN '1_month'
ELSE '0_older'
END) AS date,
document_id
FROM documents_access
WHERE partner_id = %s
)
""", self.env.user.partner_id.id)
@api.depends('access_ids')
def _compute_last_access_date_group(self):
self.env.cr.execute(SQL(
"""(%s SELECT document_id, date FROM last_access_date WHERE document_id = ANY(%s))""",
self._get_last_access_date_group_cte(),
self.ids,
))
values = {line['document_id']: line['date'] for line in self.env.cr.dictfetchall()}
for document in self:
document.last_access_date_group = values.get(document.id)
def _search_last_access_date_group(self, operator, operand):
if operator != '=':
raise NotImplementedError("Unsupported search operator or value")
query = SQL(
"""(%s SELECT document_id FROM last_access_date WHERE date = %s)""",
self._get_last_access_date_group_cte(), operand)
return [('id', 'in', query)]
def _field_to_sql(self, alias, fname, query=None, flush: bool = True) -> SQL:
if fname == 'last_access_date_group':
# Allow to group on the field
return SQL("""(%s SELECT date FROM last_access_date WHERE document_id = %s)""",
self._get_last_access_date_group_cte(), SQL.identifier(alias, 'id'))
return super()._field_to_sql(alias, fname, query, flush)
def _order_field_to_sql(self, alias, field_name, direction, nulls, query):
if field_name == 'last_access_date_group':
sql_field = SQL(
"SELECT last_access_date FROM documents_access WHERE partner_id = %s AND document_id = %s",
self.env.user.partner_id.id,
SQL.identifier(alias, 'id')
)
return SQL("(%s) %s %s", sql_field, direction, nulls)
if field_name == 'is_folder':
sql_field = SQL("%s != 'folder'", SQL.identifier(alias, 'type'))
return SQL("(%s) %s %s", sql_field, direction, nulls)
return super()._order_field_to_sql(alias, field_name, direction, nulls, query)
@api.model
def get_previewable_file_extensions(self):
return {'bmp', 'mp4', 'mp3', 'png', 'jpg', 'jpeg', 'pdf', 'gif', 'txt', 'wav'}
def action_move_documents(self, folder_id):
"""Move document to new parent folder or none (my drive or company)
:param int|bool folder_id: new parent folder id
"""
target_folder = self.browse(folder_id)
if target_folder.shortcut_document_id:
return self.action_move_documents(target_folder.shortcut_document_id.id)
documents_to_move = self.filtered(lambda d: d.folder_id != target_folder and d != target_folder)
if not documents_to_move:
return
try:
(documents_to_move | target_folder).check_access('write')
except UserError:
raise AccessError(_("You are not allowed to perform this operation."))
if target_folder and (cyclic_move := documents_to_move.filtered(
lambda d: target_folder.parent_path.startswith(d.parent_path))):
raise UserError(_(
"Impossible to move the following items as this would create a recursive hierarchy:\n"
"- %(documents)s", documents='\n- '.join(cyclic_move.mapped('name'))
))
documents_to_move.folder_id = target_folder
if target_folder and (documents_to_sync := documents_to_move.filtered(lambda d: not d.shortcut_document_id)):
documents_to_sync.sudo().action_update_access_rights(
access_internal=target_folder.access_internal,
access_via_link=target_folder.access_via_link,
is_access_via_link_hidden=target_folder.is_access_via_link_hidden,
# Simply add partners of destination
partners={access.partner_id: (access.role, access.expiration_date)
for access in target_folder.access_ids},
)
def action_change_owner(self, new_user_id):
if not self.env.user._is_admin() and not self.env.user.has_group('documents.group_documents_manager'):
if any(document.owner_id != self.env.user for document in self):
raise AccessError(_("You are not allowed to change ownerships of documents you do not own."))
self.owner_id = new_user_id
def action_create_shortcut(self, location_folder_id=None):
"""Create a shortcut to self in a specific folder or as sibling
:param int | None location_folder_id: Optional: where to create the shortcut.
"""
if not self.ids:
return
if len(self.folder_id.ids) > 1 and location_folder_id is None:
raise UserError(_("A destination is required when creating multiple shortcuts at once."))
if self.shortcut_document_id:
targets = self.filtered(lambda d: not d.shortcut_document_id) | self.shortcut_document_id
return targets.action_create_shortcut(location_folder_id or self.folder_id.id)
location = self.browse(location_folder_id) if location_folder_id is not None else self.folder_id
if location_folder_id and location.shortcut_document_id:
return self.action_create_shortcut(location.shortcut_document_id.id)
if location:
try:
location.check_access('write')
except UserError as exc:
if self.env.user.share:
raise AccessError(_("You are not allowed to write in this folder.")) from exc
location = self.browse() # My Drive
self.env.user._bus_send('simple_notification', {
'type': 'info',
'message': _("Shortcut created in My Drive"),
})
if location.shortcut_document_id:
raise AccessError(_("Shortcuts cannot contain documents."))
self.check_access('read')
return self.sudo().create([{
"folder_id": location.id,
"shortcut_document_id": document.id,
"name": document.name,
"type": document.type,
"access_internal": document.access_internal or 'view',
"access_via_link": document.access_via_link or 'none',
"company_id": document.company_id.id,
"access_ids": [
Command.create({
"partner_id": access.partner_id.id,
"role": access.role,
})
for access in document.access_ids if access.role
],
"is_multipage": document.is_multipage,
"is_access_via_link_hidden": document.is_access_via_link_hidden,
} for document in self]).sudo(False)
def action_update_access_rights(self, access_internal=None, access_via_link=None, is_access_via_link_hidden=None,
partners=None, notify=False, message=""):
"""Update access to a document and propagate if applicable.
This method can be called to update the access of internal users, with
the link, as well as a set of partners and roles to the records in self
and their children (except shortcuts), and shortcuts pointing to them
as they are kept synchronized.
Modifications to internal users and link access are propagated down to
children until the new value is already present.
For partners, all changes are applied to all children regardless of the
existing rights structure.
:param str | None access_internal: optional new permission level for internal users
:param str | None access_via_link: optional new permission level for partners with the link
:param bool|None is_access_via_link_hidden: optional new value for discoverability
:param dict[str | int | res.partner(), tuple[str | bool | None, str | datetime | bool | None] partners:
Mapping of partner(_id) to the tuple:
role: 'edit', 'view', False (=>delete),
expiration: datetime string, False (removed/None)
:param bool notify: whether to send an email
:param str message: message to add to the email
"""
if len(self.ids) == 0:
return
try:
self.check_access('write')
except UserError:
raise AccessError("You are not allowed to update these access rights.")
if len(self.ids) > 1 and notify:
raise UserError(_("Impossible to invite partners on multiple documents at once."))
if self.shortcut_document_id:
raise UserError(_("You can not update the access of a shortcut, update its target instead."))
# Check inputs as we are going to bypass the ORM in the private method(s)
access_options = {'view', 'edit', 'none', None}
hidden_options = {None, True, False}
role_options = {'edit', 'view', False, None}
incorrect_fields_to_options = {
**({'is_access_via_link_hidden': hidden_options} if is_access_via_link_hidden not in hidden_options else {}),
**({'access_via_link': access_options} if access_via_link not in access_options else {}),
**({'access_internal': access_options} if access_internal not in access_options else {}),
**({'partners.role': role_options}
if any(role not in role_options for (__, (role, __)) in (partners or {}).items()) else {})
}
if incorrect_fields_to_options:
hints = "\n- " + "\n- ".join(f'{name}: {options}' for name, options in incorrect_fields_to_options.items())
raise UserError(_(
"Incorrect values. Use one of the following for the following fields: %(hints)s.)", hints=hints
))
self._action_update_access(access_internal, access_via_link, is_access_via_link_hidden)
if partners:
partners = {
self.env['res.partner'].browse(int(partner)) if isinstance(partner, str | int) else partner:
(role, fields.Datetime.to_datetime(exp) if exp and isinstance(exp, str) else exp)
for partner, (role, exp) in (partners or {}).items()
}
root_access_partners = self.access_ids.partner_id
self._action_update_members(partners)
if notify:
self._send_access_by_mail(
{p: role for p, (role, __) in partners.items() if role and p not in root_access_partners},
message=message
)
return self.mapped('user_permission')
def _action_update_access(self, access_internal, access_via_link, is_access_via_link_hidden):
"""Update the access on self and children.
Stop the propagation when the value is already the right one.
:param str | None access_internal: change the `access_internal` if not None
:param str | None access_via_link: change the `access_via_link` if not None
:param bool | None is_access_via_link_hidden: change the `is_access_via_link_hidden` if not None
"""
self.flush_model()
shortcuts_to_check_owner_target_access = self.browse()
for field, value in (
('access_internal', access_internal),
('access_via_link', access_via_link),
('is_access_via_link_hidden', is_access_via_link_hidden),
):
if value is None:
continue
# records that we might need to update
candidates_domain = [
(field, '!=', value),
*([] if self.env.su else [('user_permission', '=', 'edit')]),
# the update is done only "target -> shortcut",
# but not "shortcut -> target"
('shortcut_document_id', '=', False),
('id', 'child_of', self.ids),
]
candidates = self.env['documents.document']._search(
candidates_domain).select('id', 'folder_id', 'shortcut_document_id', field)
shortcuts_to_check_owner_target_access |= self.search([('shortcut_document_id', 'any', candidates_domain)])
self.env.cr.execute(SQL("""
WITH RECURSIVE candidates AS (%(candidates)s),
-- explore the folders
documents_to_update AS (
SELECT id
FROM candidates
WHERE id = ANY(%(root_ids)s)
UNION
SELECT child.id
FROM candidates AS child
JOIN documents_to_update AS parent
ON child.folder_id = parent.id
),
documents_and_shortcuts AS (
SELECT id FROM documents_to_update
UNION
-- document.shortcut_ids
-- update in "SUDO" to keep them synchronized
SELECT shortcut.id
FROM documents_document AS shortcut
JOIN documents_to_update
ON documents_to_update.id = shortcut.shortcut_document_id
)
UPDATE documents_document
SET %(field)s = %(value)s
FROM documents_and_shortcuts AS doc
-- document | document.children_ids | document.shortcut_ids
WHERE documents_document.id = doc.id
""", field=SQL(field), value=value, root_ids=self.ids, candidates=candidates))
self.invalidate_model([
'access_internal',
'access_via_link',
'is_access_via_link_hidden',
'user_permission',
])
shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible()
def _action_update_members(self, partners):
"""Update the members access on all files bellow the current folder.
:param partners: Partners to add as members / change access
"""
self.env['documents.access'].flush_model()
partners_to_remove = self.env['res.partner']
# {(role, expiration_date): partners}
values_to_update = defaultdict(lambda: self.env['res.partner'])
for partner, (role, expiration_date) in partners.items():
if role is False:
# remove the members
partners_to_remove |= partner
elif role is not None or expiration_date is not None:
values_to_update[role, expiration_date] |= partner
# use `_search` to respect access rules and to use `_search_user_permission`
to_update_domain = [
*([] if self.env.su else [('user_permission', '=', 'edit')]),
('shortcut_document_id', '=', False), # update "target -> shortcuts" but not "shortcut -> target"
('id', 'child_of', self.ids),
]
documents = self.env['documents.document']._search(to_update_domain).select('id')
for (role, expiration_date), partners in values_to_update.items():
update_fields = []
if role not in ('edit', 'view'):
raise UserError(_("Invalid role.")) # The public method would have returned a more insightful message
else:
update_fields.append(SQL('role = %(role)s', role=role))
if expiration_date is not None:
update_fields.append(SQL(
'expiration_date = %(expiration_date)s',
expiration_date=expiration_date or None,
))
update_fields = SQL(',').join(update_fields)
self.env.cr.execute(SQL(
"""
WITH documents AS (%(documents)s),
documents_and_shortcuts AS (
SELECT * FROM documents
UNION
-- document.shortcut_ids
SELECT shortcut.id
FROM documents_document AS shortcut
JOIN documents AS document
ON document.id = shortcut.shortcut_document_id
)
INSERT INTO documents_access (
document_id,
partner_id,
role,
expiration_date
) (
SELECT DISTINCT ON (doc.id, partner_id) doc.id,
partner_id,
%(role)s,
%(expiration_date)s
FROM documents_and_shortcuts AS doc
JOIN LATERAL UNNEST(%(partner_ids)s) AS partner_id
ON 1=1
)
ON CONFLICT (document_id, partner_id) DO UPDATE SET
%(update_fields)s
""",
documents=documents,
partner_ids=partners.ids,
expiration_date=expiration_date or None,
role=role,
update_fields=update_fields,
))
shortcuts_to_check_owner_target_access = self.browse()
if partners_to_remove:
shortcuts_to_check_owner_target_access = self.search(expression.AND([
[('shortcut_document_id', 'any', to_update_domain)],
[('owner_id.partner_id', 'in', partners_to_remove.ids)],
]))
self.env.cr.execute(SQL("""
WITH documents AS (%(documents)s),
docs_and_shortcuts AS (
SELECT id
FROM documents
UNION
SELECT shortcut.id
FROM documents AS doc
JOIN documents_document AS shortcut
ON doc.id = shortcut.shortcut_document_id
)
DELETE FROM documents_access AS access
USING docs_and_shortcuts AS doc
WHERE access.document_id = doc.id
AND access.partner_id = ANY(%(partner_ids)s)
""", documents=documents, partner_ids=partners_to_remove.ids))
self.env['documents.document'].invalidate_model([
'access_ids',
'user_permission',
])
shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible()
def action_see_documents(self):
if self.type != "folder":
raise UserError(_("Not a folder."))
domain = [('folder_id', '=', self.id)]
return {
'name': _('Documents'),
'domain': domain,
'res_model': 'documents.document',
'type': 'ir.actions.act_window',
'views': [(False, 'list'), (False, 'form')],
'view_mode': 'list,form',
'context': {'searchpanel_default_folder_id': self.id}
}
def toggle_is_pinned_folder(self):
# TODO: remove in master
self.ensure_one()
@api.model
def get_documents_actions(self, folder_id):
"""Return the available actions and a key to know if the action is embedded on the folder."""
folder = self.env['documents.document'].browse(folder_id).exists()
if not folder:
raise UserError(_('This folder does not exist.'))
embedded_actions = self._get_folder_embedded_actions(folder.ids)
embedded_actions = embedded_actions[folder.id].action_id.ids if embedded_actions else []
actions = self.env['ir.actions.server'].sudo().search([
('model_id', '=', self.env['ir.model']._get_id('documents.document')),
('usage', '=', 'ir_actions_server'),
])
# Do not show an action if it's a child of a different action
actions -= actions.child_ids
return [{
"id": action.id,
"name": action.display_name,
"is_embedded": action.id in embedded_actions
} for action in actions]
@api.model
def action_folder_embed_action(self, folder_id, action_id, groups_ids=None):
"""Enable / disable the action for the given folder
:param int folder_id: The folder on which we pin the actions
:param int action_id: The id of the action to enable
:param list[int] groups_ids: ids of the groups the action is available to
"""
if not self.env.user.has_group('documents.group_documents_user'):
raise AccessError(_("You are not allowed to pin/unpin embedded Actions."))
if not groups_ids:
groups_ids = self.env.ref('base.group_user').ids
action = self.env['ir.actions.server'].browse(action_id).sudo().exists()
if not action:
raise UserError(_('This action does not exist.'))
if action.type != 'ir.actions.server':
raise UserError(_('You can not ping that type of action.'))
folder = self.env['documents.document'].browse(folder_id).sudo().exists()
if not folder or folder.type != 'folder':
raise UserError(_('You can not ping an action on that document.'))
embedded = self.env['ir.embedded.actions'].sudo().search([
('parent_action_id', '=', self.env.ref("documents.document_action").id),
('action_id', '=', action_id),
('action_id.type', '=', 'ir.actions.server'),
('parent_res_model', '=', 'documents.document'),
('parent_res_id', '=', folder_id),
('groups_ids', 'in', groups_ids),
]).sudo(False)
if embedded:
embedded.unlink()
else:
# first pinned action should be displayed first
last_action = self.env['ir.embedded.actions'].search(
[], order='sequence DESC', limit=1)
self.env['ir.embedded.actions'].create({
'name': action.name,
'parent_action_id': self.env.ref("documents.document_action").id,
'action_id': action.id,
'parent_res_model': 'documents.document',
'parent_res_id': folder_id,
'groups_ids': groups_ids,
'sequence': last_action.sequence + 1 if last_action else 1,
})
return self.get_documents_actions(folder_id)
@api.model
def action_execute_embedded_action(self, action_id):
"""Execute an embedded action on context records.
:param int action_id: id of embedded action to be run on context provided records.
"""
if self.env.user.share:
raise AccessError(_("You are not allowed to execute embedded actions."))
if self.env.context.get('active_model') != 'documents.document':
raise UserError(_("Unavailable action."))
ids = self.env.context.get('active_ids', self.env.context.get('active_id'))
if not ids:
raise UserError(_("Missing documents reference."))
embedded_action = self.env['ir.embedded.actions'].browse([action_id])
if all(action_id in document.available_embedded_actions_ids.ids for document in self.browse(ids)):
return self.env['ir.actions.server'].browse(embedded_action.action_id.id).run()
raise UserError(_("Unavailable action."))
def action_link_to_record(self, model=False):
"""Open the `link_to_record_wizard` to choose a record to link to the current documents.
This method can be used inside server actions.
"""
context = {
'default_document_ids': self.ids,
'default_resource_ref': False,
'default_is_readonly_model': False,
'default_model_ref': False,
}
if documents_link_record := self.filtered(lambda d: d.res_model != 'documents.document'):
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'warning',
'message': _(
"Already linked Documents: %s",
", ".join(documents_link_record.mapped('name'))
),
}
}
if model:
self.env[model].check_access('write')
context['default_is_readonly_model'] = True
context['default_model_id'] = self.env['ir.model']._get_id(model)
first_valid_id = self.env[model].search([], limit=1).id
context['default_resource_ref'] = f'{model},{first_valid_id}'
return {
'name': _('Choose a record to link'),
'type': 'ir.actions.act_window',
'res_model': 'documents.link_to_record_wizard',
'view_mode': 'form',
'target': 'new',
'views': [(False, "form")],
'context': context,
}
def _send_access_by_mail(self, partners, message=""):
"""Send a notification email to contacts granted with a new document/folder access.
:param dict[res.partner(), str] partners: Mapping of partner to new_role: 'edit', 'view'.
:param message: message to add to the email
"""
self.ensure_one()
subject = _('%s shared with you', self.display_name) if self.display_name else _('Access to a folder or a document')
formatted_msg = Markup(message) if message else ""
roles_info = {
(role, lang): {
"body": self.env['ir.qweb'].with_context(lang=lang)._render(
'documents.mail_template_document_share',
{'record': self, 'user': self.env.user, 'message': formatted_msg}
),
"role_label": _('Editor') if role == 'edit' else _('Viewer'),
} for role, lang in {(role, partner.lang) for partner, role in partners.items()}
}
for partner, role in partners.items():
self.with_context(lang=partner.lang).message_notify(
body=roles_info[role, partner.lang]['body'],
email_layout_xmlid='mail.mail_notification_layout',
partner_ids=partner.ids,
subject=subject,
subtitles=[self.display_name, _('Your Role: %s', roles_info[role, partner.lang]['role_label'])],
)
def _notify_get_recipients_groups(self, message, model_description, msg_vals=None):
groups = super()._notify_get_recipients_groups(
message, model_description, msg_vals=msg_vals
)
if len(self.ids) != 1:
return groups
group_values = {
'active': True,
'button_access': {'url': self.access_url},
'has_button_access': True,
}
return [
('group_documents_document_people_with_access',
lambda pdata:
(pdata['uid'] and self.with_user(pdata['uid']).user_permission != 'none') or
(pdata['id'] and self.access_via_link != 'none'
and self.access_ids.filtered(lambda a: a.partner_id.id == pdata['id'] and a.role)),
group_values)
] + groups
def get_deletion_delay(self):
return int(self.env['ir.config_parameter'].sudo().get_param('documents.deletion_delay', '30'))
def _get_is_multipage(self):
"""Whether the document can be considered multipage, if able to determine.
:return: `None` if mimetype not handled, `False` if single page or error occurred, `True` otherwise.
:rtype: bool | None
"""
if self.mimetype not in ('application/pdf', 'application/pdf;base64'):
return None
stream = io.BytesIO(base64.b64decode(self.datas))
try:
return PdfFileReader(stream, strict=False).numPages > 1
except AttributeError:
raise # If PyPDF's API changes and the `numPages` property isn't there anymore, not if its computation fails.
except Exception: # noqa: BLE001
_logger.warning('Impossible to count pages in %r. It could be due to a malformed document or a '
'(possibly known) issue within PyPDF2.', self.name, exc_info=True)
return False
def _get_models(self, domain):
"""
Return the names of the models to which the attachments are attached.
:param domain: the domain of the _read_group on documents.
:return: a list of model data, the latter being a dict with the keys
'id' (technical name),
'name' (display name) and
'__count' (how many attachments with that domain).
"""
not_a_file = []
not_attached = []
models = []
groups = self._read_group(domain, ['res_model'], ['__count'])
for res_model, count in groups:
if not res_model:
not_a_file.append({
'id': res_model,
'display_name': _('Not a file'),
'__count': count,
})
elif res_model == 'documents.document':
not_attached.append({
'id': res_model,
'display_name': _('Not attached'),
'__count': count,
})
else:
models.append({
'id': res_model,
'display_name': self.env['ir.model']._get(res_model).display_name,
'__count': count,
})
return sorted(models, key=lambda m: m['display_name']) + not_attached + not_a_file
@api.depends('favorited_ids')
@api.depends_context('uid')
def _compute_is_favorited(self):
favorited = self._filtered_access('read').filtered(lambda d: self.env.user in d.favorited_ids)
favorited.is_favorited = True
(self - favorited).is_favorited = False
def _inverse_is_favorited(self):
unfavorited_documents = favorited_documents = self.env['documents.document'].sudo()
for document in self:
if self.env.user in document.favorited_ids:
unfavorited_documents |= document
else:
favorited_documents |= document
favorited_documents.write({'favorited_ids': [(4, self.env.uid)]})
unfavorited_documents.write({'favorited_ids': [(3, self.env.uid)]})
@api.depends('res_model')
def _compute_res_model_name(self):
for record in self:
if record.res_model:
record.res_model_name = self.env['ir.model']._get(record.res_model).display_name
else:
record.res_model_name = False
@api.constrains('url')
def _check_url(self):
for document in self.filtered("url"):
if not document.url.startswith(('https://', 'http://', 'ftp://')):
raise ValidationError(_('URL %s does not seem complete, as it does not begin with http(s):// or ftp://', document.url))
@api.model
def message_new(self, msg_dict, custom_values=None):
"""When an email comes, create a document with the default values,
then let `_message_post_after_hook` create one document per attachment."""
custom_values = custom_values or {}
folder = self.env['documents.document'].browse(custom_values.get('folder_id'))
custom_values['name'] = _('Mail: %s', msg_dict.get('subject'))
if 'tag_ids' not in custom_values:
custom_values['tag_ids'] = folder.alias_tag_ids.ids
else:
tags = custom_values['tag_ids']
if tags and isinstance(tags[0], list | tuple):
# we have a list of m2m commands
if all(len(t) >= 2 and t[0] == Command.LINK for t in tags):
tags = [t[1] for t in tags]
elif len(tags) == 1 and len(tags[0]) == 3 and tags[0][0] == Command.SET:
tags = tags[0][2]
else: # do not support other commands
tags = []
custom_values['tag_ids'] = self.env['documents.tag'].browse(tags).exists().ids
custom_values['active'] = False
return super().message_new(msg_dict, custom_values)
def _alias_get_creation_values(self):
values = super()._alias_get_creation_values()
values['alias_model_id'] = self.env['ir.model']._get('documents.document').id
if self.id:
values['alias_defaults'] = literal_eval(self.alias_defaults or "{}")
values['alias_defaults'] |= {'folder_id': self.id}
return values
def _message_post_after_hook(self, message, msg_vals):
""" If the res model was an attachment and a mail, adds all the custom values of the linked
document settings to the attachments of the mail.
"""
m2m_commands = msg_vals['attachment_ids']
attachments = self.env['ir.attachment'].browse([x[1] for x in m2m_commands])
if (not self.env.context.get("no_document") or message.message_type == 'email') and attachments:
self.attachment_id = False
documents = self.env['documents.document'].create([{
'name': attachment.name,
'attachment_id': attachment.id,
'folder_id': self.folder_id.id,
'owner_id': self.folder_id.owner_id.id or self.env.ref('base.user_root').id,
'partner_id': self.partner_id.id,
'tag_ids': self.tag_ids.ids,
} for attachment in attachments])
for attachment, document in zip(attachments, documents):
attachment.write({
'res_model': 'documents.document',
'res_id': document.id,
})
document.message_post(
message_type='email',
body=msg_vals.get('body', ''),
email_from=msg_vals.get('email_from'),
subject=msg_vals.get('subject') or self.name
)
# Activity settings set through alias_defaults values has precedence over the activity folder settings
if self.create_activity_option:
document.documents_set_activity(settings_record=self)
elif self.folder_id.create_activity_option:
document.documents_set_activity(settings_record=self.folder_id)
return super()._message_post_after_hook(message, msg_vals)
def documents_set_activity(self, settings_record=None):
"""
Generate an activity based on the fields of settings_record.
:param settings_record: the record that contains the activity fields.
settings_record.create_activity_type_id (required)
settings_record.create_activity_summary
settings_record.create_activity_note
settings_record.create_activity_date_deadline_range
settings_record.create_activity_date_deadline_range_type
settings_record.create_activity_user_id
"""
if settings_record and settings_record.create_activity_type_id:
for record in self:
activity_vals = {
'activity_type_id': settings_record.create_activity_type_id.id,
'summary': settings_record.create_activity_summary or '',
'note': settings_record.create_activity_note or '',
}
if settings_record.create_activity_date_deadline_range > 0:
activity_vals['date_deadline'] = fields.Date.context_today(settings_record) + relativedelta(
**{
settings_record.create_activity_date_deadline_range_type: settings_record.create_activity_date_deadline_range})
if settings_record._fields.get(
'create_has_owner_activity') and settings_record.create_has_owner_activity and record.owner_id:
user = record.owner_id
elif settings_record._fields.get('create_activity_user_id') and settings_record.create_activity_user_id:
user = settings_record.create_activity_user_id
elif settings_record._fields.get('user_id') and settings_record.user_id:
user = settings_record.user_id
else:
user = self.env.user
if user:
activity_vals['user_id'] = user.id
record.activity_schedule(**activity_vals)
def copy_data(self, default=None):
default = dict(default or {})
vals_list = super().copy_data(default=default)
if 'name' not in default:
for document, vals in zip(self, vals_list):
vals['name'] = document.name if document.type == 'folder' else _("%s (copy)", document.name)
for document, vals in zip(self, vals_list):
# Avoid to propagate folder access as we want to copy the document accesses alone
vals['access_ids'] = default.get('access_ids', False)
if 'owner_id' not in vals:
vals['owner_id'] = self.env.user.id
return vals_list
def copy(self, default=None):
if not self:
return self
if not all(self.mapped('active')):
raise UserError(_('You cannot duplicate document(s) in the Trash.'))
# As we avoid to propagate the folder permission by setting access_ids to False (see copy_data), user has no
# right to create the document. So after checking permission, we execute the copy in sudo.
self.check_access("create")
self.check_access('read')
if not self.env.su and self.folder_id and self.folder_id.user_permission != 'edit':
# do not check access to allow copying in root company folders
raise AccessError(_('You cannot copy in that folder'))
documents_order = {doc.id: idx for idx, doc in enumerate(self)}
new_documents = [self.browse()] * len(self)
skip_documents = self.env.context.get('documents_copy_folders_only')
shortcuts = self.filtered('shortcut_document_id')
if not skip_documents:
for destination, targets in shortcuts.grouped('folder_id').items():
new_shortcuts = targets.action_create_shortcut(destination.id)
for new_shortcut, target in zip(new_shortcuts, targets):
new_documents[documents_order[target.id]] = new_shortcut
folders = (self - shortcuts).filtered(lambda d: d.type == 'folder')
if folders:
embedded_actions = self._get_folder_embedded_actions(folders.ids)
new_folders = folders.sudo()._copy_with_access(default=default).sudo(False)
for old_folder, new_folder in zip(folders, new_folders):
if folder_embedded_actions := embedded_actions.get(old_folder.id):
embedded_actions_copies = folder_embedded_actions.copy()
embedded_actions_copies.parent_res_id = new_folder.id
# no need to check for permission as user is owner of copies (see copy_data).
old_folder.children_ids.copy({'folder_id': new_folder.id})
new_documents[documents_order[old_folder.id]] = new_folder
if not skip_documents and (documents_sudo := (self - shortcuts - folders).sudo()):
new_binaries_sudo = documents_sudo._copy_with_access(default=default)
for old_document_sudo, new_binary_sudo in zip(documents_sudo, new_binaries_sudo):
new_documents[documents_order[old_document_sudo.id]] = new_binary_sudo.sudo(False)
if to_copy_attachment_sudo := documents_sudo._copy_attachment_filter(default):
new_attachments_iterator = iter(to_copy_attachment_sudo.attachment_id.with_context(no_document=True).copy())
for old_document_sudo, new_binary_sudo in zip(documents_sudo, new_binaries_sudo):
if old_document_sudo._copy_attachment_filter(default):
new_attachment = next(new_attachments_iterator)
new_binary_sudo.write({
'attachment_id': new_attachment.id,
# Avoid recompute based on attachment_id
'name': new_binary_sudo.name,
'url_preview_image': False,
})
return self.browse([new_document.id for new_document in new_documents])
def _copy_attachment_filter(self, default):
if default and 'attachment_id' in default:
return self.env['documents.document']
return self.filtered('attachment_id')
def _copy_with_access(self, default):
"""Copy documents with their access. !Assumes that access rights were checked before! """
if not self:
return self
res = super().copy(default=default)
if default and 'access_ids' in default:
return res
access_vals_list = []
for doc, doc_copied in zip(self, res):
access_vals_list += doc.access_ids.filtered('role').copy_data(default={'document_id': doc_copied.id})
self.env['documents.access'].sudo().create(access_vals_list)
return res
def toggle_favorited(self):
self.ensure_one()
self.sudo().write({'favorited_ids': [(3 if self.env.user in self[0].favorited_ids else 4, self.env.user.id)]})
def access_content(self):
self.ensure_one()
action = {
'type': "ir.actions.act_url",
'target': "new",
}
if self.url:
action['url'] = self.url
elif self.type == 'binary':
action['url'] = f'/documents/content/{self.access_token}'
return action
def open_resource(self):
self.ensure_one()
if self.res_model and self.res_id:
view_id = self.env[self.res_model].get_formview_id(self.res_id)
return {
'res_id': self.res_id,
'res_model': self.res_model,
'type': 'ir.actions.act_window',
'views': [[view_id, 'form']],
}
def toggle_lock(self):
"""
sets a lock user, the lock user is the user who locks a file for themselves, preventing data replacement
and archive (therefore deletion) for any user but himself.
Members of the group documents.group_documents_manager and the superuser can unlock the file regardless.
"""
self.ensure_one()
if self.lock_uid:
if self.env.user == self.lock_uid or self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager'):
self.lock_uid = False
else:
self.lock_uid = self.env.uid
@api.depends_context('uid')
@api.depends('lock_uid')
def _compute_is_locked(self):
for record in self:
record.is_locked = record.lock_uid and not (
self.env.user == record.lock_uid or
self.env.is_admin() or
self.env.user.has_group('documents.group_documents_manager'))
def action_archive(self):
if not self:
return
to_archive_sudo = self.sudo().with_context(active_test=False).search([('id', 'child_of', self.ids)])
active_documents = to_archive_sudo.filtered(self._active_name).sudo(False)
if not active_documents:
return
# As document archiving leads to deletion
message = _("You do not have sufficient access rights to delete these documents.")
try:
active_documents.check_access('unlink')
except UserError as e:
raise AccessError(message) from e
# As edit on parent is required to restore (and as removing a file is also somehow modifying the folder)
if folder_ids := self.folder_id:
if any(permission != 'edit' for permission in folder_ids.mapped('user_permission')):
raise UserError(message)
active_documents._raise_if_used_folder()
deletion_date = fields.Date.to_string(fields.Date.today() + relativedelta(days=self.get_deletion_delay()))
log_message = _("This file has been sent to the trash and will be deleted forever on the %s", deletion_date)
active_documents._message_log_batch(bodies={doc.id: log_message for doc in active_documents})
return super(Document, active_documents).action_archive()
def action_unarchive(self):
self_archived = self.filtered(lambda d: not d.active)
if not self_archived:
return
archived_top_parent_documents = self.env["documents.document"].sudo().search(
expression.AND([
[('id', 'parent_of', self_archived.ids)],
[('id', 'not in', self_archived.ids)],
[('active', '=', False)],
expression.OR([
[('folder_id', '=', False)],
[('folder_id.active', '=', True)],
])
])
).sudo(False)
if archived_top_parent_documents:
raise UserError(_(
"Item(s) you wish to restore are included in archived folders. "
"To restore these items, you must restore the following including folders instead:\n"
"- %(folders_list)s",
folders_list="\n-".join(archived_top_parent_documents.mapped('name')))) # "Restricted" if not allowed
# Leave archived children (and descendants) the current user doesn't have access to.
to_unarchive_candidate_documents = self.env['documents.document'].with_context(active_test=False).search(
[('id', 'child_of', self_archived.ids)])
seen_documents, to_unarchive_ids = set(), set()
def add_if_can_be_restored(doc):
if doc in seen_documents or seen_documents.add(doc):
return doc.id in to_unarchive_ids
if not doc.folder_id or doc.folder_id.sudo().active or add_if_can_be_restored(doc.folder_id):
to_unarchive_ids.add(doc.id)
return True
return False
for document in to_unarchive_candidate_documents:
add_if_can_be_restored(document)
to_unarchive_documents = to_unarchive_candidate_documents.filtered(lambda d: d.id in to_unarchive_ids)
log_message = _("This document has been restored.")
to_unarchive_documents._message_log_batch(bodies={doc.id: log_message for doc in to_unarchive_documents})
return super(Document, to_unarchive_documents).action_unarchive()
@api.model_create_multi
def create(self, vals_list):
"""Access rights fields (access_ids), access_internal, and access_via_link are inherited from containing folder
unless specified in vals or context defaults.
"""
attachments = []
for vals in vals_list:
keys = [key for key in vals if
self._fields[key].related and self._fields[key].related.split('.')[0] == 'attachment_id']
attachment_dict = {key: vals.pop(key) for key in keys if key in vals}
attachment = self.env['ir.attachment'].browse(vals.get('attachment_id'))
if attachment and attachment_dict:
attachment.write(attachment_dict)
elif attachment_dict:
attachment_dict.setdefault('name', vals.get('name', 'unnamed'))
# default_res_model and default_res_id will cause unique constraints to trigger.
attachment = self.env['ir.attachment'].with_context(clean_context(self.env.context)).create(attachment_dict)
vals['attachment_id'] = attachment.id
vals['name'] = vals.get('name', attachment.name)
attachments.append(attachment)
# don't allow using default_access_ids
documents = super(Document, self.with_context(default_access_ids=None)).create(vals_list)
is_manager = self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager')
if not is_manager:
if any(d.alias_name for d in documents):
raise AccessError(_('Only Documents Managers can set aliases.'))
if any(d.is_pinned_folder for d in documents):
raise AccessError(_('Only Documents Managers can create in company folder.'))
for document, attachment in zip(documents, attachments):
if attachment and not attachment.res_id and (
not attachment.res_model or attachment.res_model == 'documents.document'):
attachment.with_context(no_document=True).write({
'res_model': 'documents.document',
'res_id': document.id})
return documents
def _prepare_create_values(self, vals_list):
old_vals_list = [vals.copy() for vals in vals_list]
vals_list = super()._prepare_create_values(vals_list)
folders = self.env['documents.document'].browse(v['folder_id'] for v in vals_list if v.get('folder_id'))
users = self.env['res.users'].browse(v['owner_id'] for v in vals_list if v.get('owner_id'))
folders.fetch(('access_internal', 'access_via_link', 'access_ids', 'active', 'owner_id'))
(users | folders.owner_id).fetch(['partner_id'])
odoobot = self.env.ref('base.user_root')
vals_list_to_update_linked_record = []
for vals, old_vals in zip(vals_list, old_vals_list):
owner = self.env['res.users'].browse(vals.get('owner_id', self.env.user.id))
owner_values = {'partner_id': owner.partner_id.id, 'role': False, 'last_access_date': fields.Datetime.now()}
vals_values = {
'owner_id': owner.id,
'access_ids': [Command.create(owner_values)] if owner != odoobot else []
}
if vals.get('folder_id'):
folder = self.env['documents.document'].browse(vals['folder_id'])
if not folder.active:
raise UserError('It is not possible to create documents in an archived folder.')
vals_values.update({
'access_via_link': folder.access_via_link,
'access_internal': folder.access_internal,
'access_ids':
vals_values['access_ids']
+ [
Command.create({'partner_id': access.partner_id.id, 'role': access.role})
for access in folder.access_ids
if access.role and access.partner_id != owner.partner_id
] + ([Command.create({'partner_id': folder.owner_id.partner_id.id, 'role': 'edit'})]
if folder.owner_id != odoobot and folder.owner_id != owner else []
),
})
vals.update((k, v) for k, v in vals_values.items() if k not in old_vals)
# If res_model and res_id are not set, we must get it from the related attachment if set (prepare list)
if 'res_model' not in vals and 'res_id' not in vals and isinstance(vals.get('attachment_id'), int):
vals_list_to_update_linked_record.append(vals)
# For the next step, we need to ensure the related ref is present by getting it from attachment if needed
if vals_list_to_update_linked_record:
attachment_by_id = self.env['ir.attachment'].browse(
[vals['attachment_id'] for vals in vals_list_to_update_linked_record]).grouped('id')
for vals in vals_list_to_update_linked_record:
attachment = attachment_by_id[vals['attachment_id']]
vals['res_model'] = attachment.res_model
vals['res_id'] = attachment.res_id
# Delegate vals_list update to _prepare_create_values_for_model to add values depending on related record
updated_vals_list = []
for res_model, model_vals_tuple_list in groupby(zip(vals_list, old_vals_list), lambda v: v[0].get('res_model')):
updated_vals_list += self._prepare_create_values_for_model(
res_model,
[vals_tuple[0] for vals_tuple in model_vals_tuple_list],
[vals_tuple[1] for vals_tuple in model_vals_tuple_list],
)
return updated_vals_list
def _prepare_create_values_for_model(self, res_model, vals_list, pre_vals_list):
"""Override to add values depending on related model/record"""
if (
res_model
and issubclass(self.pool[res_model], self.pool['documents.mixin'])
and not self._context.get('no_document')
):
return self.env[res_model]._prepare_document_create_values_for_linked_records(
res_model, vals_list, pre_vals_list)
return vals_list
def write(self, vals):
if 'shortcut_document_id' in vals:
raise UserError(_("Shortcuts cannot change target document."))
is_manager = self.env.is_admin() or self.env.user.has_group('documents.group_documents_manager')
pinned_folders_start = self.filtered('is_pinned_folder')
shortcuts_to_check_owner_target_access = self.browse()
if (owner_id := vals.get('owner_id')) is not None:
if not isinstance(owner_id, int): # recordset
owner_id = owner_id.id
if not is_manager and any(d.owner_id != self.env.user for d in self):
raise AccessError(_("You cannot change the owner of documents you do not own."))
targets_changing_owner = self.filtered(lambda d: d.owner_id.id != owner_id)
shortcuts_to_check_owner_target_access |= targets_changing_owner.shortcut_ids.filtered(
lambda d: d.owner_id == d.shortcut_document_id.owner_id)
if folder_id := vals.get('folder_id'):
folder = self.env['documents.document'].browse(folder_id)
if not self.env.su and folder.user_permission != 'edit':
raise AccessError(_("You can't access that folder_id."))
if folder.type != 'folder' or folder.shortcut_document_id:
raise UserError(_("Invalid folder id"))
if any(not d.active or not folder.active or d.folder_id and not d.folder_id.active for d in self):
raise UserError(_("It is not possible to move archived documents or documents to archived folders."))
if vals.get('active') is False:
if self.env.user.share:
raise UserError(_("You are not allowed to (un)archive documents."))
self.check_access('unlink') # As archived gc leads to unlink after `deletion_delay` days.
attachment_id = vals.get('attachment_id')
if attachment_id:
self.ensure_one()
attachments_was_present = []
for record in self:
attachments_was_present.append(bool(record.attachment_id))
if record.type == 'binary' and not record.datas and ('datas' in vals or 'url' in vals):
body = _("Document Request: %(name)s Uploaded by: %(user)s", name=record.name, user=self.env.user.name)
record.with_context(no_document=True).message_post(body=body)
if record.attachment_id:
# versioning
if attachment_id and attachment_id != record.attachment_id.id:
# Link the new attachment to the related record and link the previous one
# to the document.
self.env["ir.attachment"].browse(attachment_id).with_context(
no_document=True
).write(
{
"res_model": record.res_model or "documents.document",
"res_id": record.res_id if record.res_model else record.id,
}
)
related_record = self.env[record.res_model].browse(record.res_id)
if (
not hasattr(related_record, "message_main_attachment_id")
or related_record.message_main_attachment_id
!= record.attachment_id
):
record.attachment_id.with_context(no_document=True).write(
{"res_model": "documents.document", "res_id": record.id}
)
if attachment_id in record.previous_attachment_ids.ids:
record.previous_attachment_ids = [(3, attachment_id, False)]
record.previous_attachment_ids = [(4, record.attachment_id.id, False)]
if 'datas' in vals:
old_attachment = record.attachment_id.with_context(no_document=True).copy()
# removes the link between the old attachment and the record.
old_attachment.write({
'res_model': 'documents.document',
'res_id': record.id,
})
record.previous_attachment_ids = [(4, old_attachment.id, False)]
elif vals.get('datas') and not vals.get('attachment_id'):
res_model = vals.get('res_model', record.res_model or 'documents.document')
res_id = vals.get('res_id') if vals.get('res_model') else record.res_id if record.res_model else record.id
if res_model and res_model != 'documents.document' and not self.env[res_model].browse(res_id).exists():
record.res_model = res_model = 'documents.document'
record.res_id = res_id = record.id
attachment = self.env['ir.attachment'].with_context(no_document=True).create({
'name': vals.get('name', record.name),
'res_model': res_model,
'res_id': res_id
})
record.attachment_id = attachment.id
# pops the datas and/or the mimetype key(s) to explicitly write them in batch on the ir.attachment
# so the mimetype is properly set. The reason was because the related keys are not written in batch
# and because mimetype is readonly on `ir.attachment` (which prevents writing through the related).
attachment_dict = {key: vals.pop(key) for key in ['datas', 'mimetype'] if key in vals}
if not is_manager and set(vals) & set(self.env['mail.alias.mixin']._fields):
raise AccessError(_('Only Documents managers can set an alias.'))
write_result = super().write(vals)
if attachment_dict:
self.mapped('attachment_id').write(attachment_dict)
if 'attachment_id' in vals:
self.attachment_id.check('read')
if (new_active := vals.get('active')) is not None:
if not new_active and self.sudo().search([('id', 'child_of', self.ids), ('active', '=', True)]):
raise UserError(_('Operation not supported. Please use "Move to Trash" / `action_archive` instead.'))
if new_active and self.sudo().search([('id', 'parent_of', self.ids), ('active', '=', False)]):
raise UserError(_('Operation not supported. Please use "Restore" / `action_unarchive` instead.'))
if not is_manager and self.filtered('is_pinned_folder') != pinned_folders_start:
raise AccessError(_("Only Documents Managers can create in company folder."))
for document, attachment_was_present in zip(self, attachments_was_present):
if document.request_activity_id and document.attachment_id and not attachment_was_present:
feedback = _("Document Request: %(name)s Uploaded by: %(user)s",
name=self.name, user=self.env.user.name)
document.with_context(no_document=True).request_activity_id.action_feedback(
feedback=feedback, attachment_ids=[document.attachment_id.id])
if (company_id := vals.get('company_id')) and self.shortcut_ids: # no need if resetting company_id to False
self.shortcut_ids.sudo().write({'company_id': company_id})
if shortcuts_to_check_owner_target_access:
shortcuts_to_check_owner_target_access._unlink_shortcut_if_target_inaccessible()
return write_result
@api.model
def _pdf_split(self, new_files=None, open_files=None, vals=None):
vals = vals or {}
new_attachments = self.env['ir.attachment']._pdf_split(new_files=new_files, open_files=open_files)
new_documents = self.create([
dict(vals, attachment_id=attachment.id) for attachment in new_attachments
])
# Prevent concurrent update error on accessing these documents for the first time on exiting the split tool
env_partner = self.env.user.partner_id
documents_not_member = new_documents.filtered(lambda d: env_partner not in d.access_ids.partner_id)
self.env['documents.access'].sudo().create([
{'document_id': doc.id, 'partner_id': env_partner.id, 'last_access_date': fields.Datetime.now()}
for doc in documents_not_member
])
return new_documents
@api.model
def search_panel_select_range(self, field_name, **kwargs):
if field_name == 'folder_id':
enable_counters = kwargs.get('enable_counters', False)
search_panel_fields = ['access_token', 'company_id', 'description', 'display_name', 'folder_id',
'is_favorited', 'is_pinned_folder', 'owner_id', 'shortcut_document_id',
'user_permission']
domain = [('type', '=', 'folder')]
if unique_folder_id := self.env.context.get('documents_unique_folder_id'):
values = self.env['documents.document'].search_read(
expression.AND([domain, [('folder_id', 'child_of', unique_folder_id)]]),
search_panel_fields,
load=False,
)
accessible_folder_ids = {rec['id'] for rec in values}
for record in values:
if record['folder_id'] not in accessible_folder_ids:
record['folder_id'] = False # consider them as roots
return {
'parent_field': 'folder_id',
'values': values,
}
records = self.env['documents.document'].search_read(domain, search_panel_fields)
accessible_folder_ids = {rec['id'] for rec in records}
domain_image = {}
if enable_counters:
model_domain = expression.AND([
kwargs.get('search_domain', []),
kwargs.get('category_domain', []),
kwargs.get('filter_domain', []),
[(field_name, '!=', False)]
])
domain_image = self._search_panel_domain_image(field_name, model_domain, enable_counters)
values_range = OrderedDict()
shared_root_id = "SHARED" if not self.env.user.share else False
for record in records:
record_id = record['id']
if enable_counters:
image_element = domain_image.get(record_id)
record['__count'] = image_element['__count'] if image_element else 0
folder_id = record['folder_id']
if folder_id:
folder_id = folder_id[0]
if folder_id not in accessible_folder_ids:
if record['shortcut_document_id']:
continue
folder_id = shared_root_id
elif record['owner_id'][0] == self.env.user.id:
folder_id = "MY"
elif record['owner_id'][0] != self.env.ref('base.user_root').id or self.env.user.share:
if record['shortcut_document_id']:
continue
folder_id = shared_root_id
else:
folder_id = "COMPANY"
record['folder_id'] = folder_id
values_range[record_id] = record
if enable_counters:
self._search_panel_global_counters(values_range, 'folder_id')
special_roots = []
if not self.env.user.share:
special_roots = [
{'bold': True, 'childrenIds': [], 'parentId': False, 'user_permission': 'edit'} | values
for values in [
{
'display_name': _("Company"),
'id': 'COMPANY',
'description': _("Common roots for all company users."),
'user_permission': 'view',
}, {
'display_name': _("My Drive"),
'id': 'MY',
'user_permission': 'edit',
'description': _("Your individual space."),
}, {
'display_name': _("Shared with me"),
'id': 'SHARED',
'description': _("Additional documents you have access to."),
}, {
'display_name': _("Recent"),
'id': 'RECENT',
'description': _("Recently accessed documents."),
}, {
'display_name': _("Trash"),
'id': 'TRASH',
'description': _("Items in trash will be deleted forever after %s days.",
self.get_deletion_delay()),
}]
]
return {
'parent_field': 'folder_id',
'values': list(values_range.values()) + special_roots,
}
return super().search_panel_select_range(field_name)
@api.model
def get_document_max_upload_limit(self):
ICP = self.env['ir.config_parameter'].sudo()
for key in ('document.max_fileupload_size', 'web.max_file_upload_size'):
value = ICP.get_param(key, default=None)
if value is None:
continue
try:
return int(value) or None
except ValueError:
_logger.error("invalid %s: %r", key, value)
return odoo.http.DEFAULT_MAX_CONTENT_LENGTH
@api.model
def can_upload_traceback(self):
return self.env.user._is_internal and \
bool(self.env.ref('documents.document_support_folder', raise_if_not_found=False))
def unlink(self):
"""Clean unused linked records too.
This applies to:
* Children documents when deleting the parent folder
* Parent folder if it is archived and has no other children and user is allowed to
* Attachment if document-related record is deleted
"""
to_delete = self.sudo().with_context(active_test=False).search([('id', 'child_of', self.ids)]).sudo(False)
removable_parent_folders = self.with_context(active_test=False).folder_id.filtered(
lambda folder: len(folder.children_ids - self) == 0 and not folder.active)
removable_attachments = self.filtered(lambda d: d.res_model != d._name).attachment_id
res = super(Document, to_delete).unlink()
if removable_attachments:
removable_attachments.unlink()
if removable_parent_folders:
with contextlib.suppress(AccessError):
removable_parent_folders.unlink()
return res
@api.ondelete(at_uninstall=False)
def _unlink_except_unauthorized(self):
try:
self.check_access('unlink')
except UserError as e: # Hide potentially unknown inaccessible content's name.
raise UserError(_("You are not allowed to delete all these items.")) from e
@api.ondelete(at_uninstall=False)
def _unlink_except_company_folders(self):
self._raise_if_used_folder()
def _raise_if_used_folder(self):
if folder_ids := self.filtered(lambda d: d.type == 'folder').ids:
company_field_names = [
company_field_name
for company_field_name, field in self.env['res.company']._fields.items()
if field.comodel_name == "documents.document"
]
if self.env['res.company'].search_count(expression.OR([
[(field_name, 'in', folder_ids)] for field_name in company_field_names
]), limit=1):
raise ValidationError(_("Impossible to delete folders used by other applications."))
def _unlink_shortcut_if_target_inaccessible(self):
"""As a fix in stable version, delete shortcuts when target is no longer accessible to the owner."""
for owner, shortcuts in self.filtered('shortcut_document_id').grouped("owner_id").items():
shortcuts_as_owner_sudo = shortcuts.with_user(owner).sudo()
shortcuts_as_owner_sudo.filtered(lambda d: d.shortcut_document_id.user_permission == 'none').unlink()
@api.autovacuum
def _gc_clear_bin(self):
"""Files are deleted automatically from the trash bin after the configured remaining days."""
deletion_delay = self.get_deletion_delay()
self.search([
('active', '=', False),
('write_date', '<=', fields.Datetime.now() - relativedelta(days=deletion_delay)),
], limit=1000).unlink()
def _get_access_action(self, access_uid=None, force_website=False):
self.ensure_one()
if access_uid and not force_website and self.active and self.env.user.has_group("documents.group_documents_user"):
url_params = url_encode({
'preview_id': self.id,
'view_id': self.env.ref("documents.document_view_kanban").id,
'menu_id': self.env.ref("documents.menu_root").id,
'folder_id': self.folder_id.id,
})
return {
"type": "ir.actions.act_url",
"url": f"/odoo/action-documents.document_action?{url_params}"
}
return super()._get_access_action(access_uid=access_uid, force_website=force_website)
@api.readonly
def permission_panel_data(self):
"""Provide access related data for a given document/folder"""
specification = self._permission_specification()
self.check_access('read')
result = self.sudo().with_context(active_test=False).web_search_read([('id', '=', self.id)], specification)
record = result['records'][0]
selections = {'access_via_link': self._fields.get('access_via_link').selection}
if self.env.user.has_group('base.group_user'):
record['access_ids'] = [a for a in record['access_ids'] if a['role']]
if record['owner_id']['id'] == self.env.ref('base.user_root').id:
record['owner_id'] = False # Only a real user should be shown in the panel
selections.update({
'access_internal': self._fields.get('access_internal').selection,
'doc_access_roles': self.env['documents.access']._fields.get('role').selection,
})
return {'record': record, 'selections': selections}
def _permission_specification(self):
specification = {
'access_internal': {},
'access_via_link': {},
'access_url': {},
'active': {},
'display_name': {},
'folder_id': {},
'is_access_via_link_hidden': {},
'type': {},
'user_permission': {},
}
if self.env.user.has_group('base.group_user'):
partner_id_spec = {'fields': {'email': {}, 'name': {}, 'user_id': {}}}
specification.update({
'access_ids': {
'fields': {
'document_id': {},
'partner_id': partner_id_spec,
'role': {},
'expiration_date': {},
},
},
'owner_id': {
'fields': {
'partner_id': partner_id_spec,
},
}
})
return specification