696 lines
30 KiB
Python
696 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import base64
|
|
import io
|
|
import json
|
|
import logging
|
|
import pathlib
|
|
import zipfile
|
|
from collections import defaultdict
|
|
from contextlib import ExitStack
|
|
from http import HTTPStatus
|
|
from typing import NamedTuple
|
|
|
|
from werkzeug.exceptions import BadRequest, Forbidden
|
|
|
|
from odoo import conf, fields, http, _
|
|
from odoo.exceptions import MissingError
|
|
from odoo.http import request, content_disposition
|
|
from odoo.osv import expression
|
|
from odoo.tools import replace_exceptions, str2bool, consteq
|
|
|
|
from odoo.addons.mail.controllers.attachment import AttachmentController
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ShareRoute(http.Controller):
|
|
|
|
# util methods #################################################################################
|
|
def _max_content_length(self):
|
|
return request.env['documents.document'].get_document_max_upload_limit()
|
|
|
|
@classmethod
|
|
def _get_folder_children(cls, folder_sudo):
|
|
if request.env.user._is_public():
|
|
permission_domain = expression.AND([
|
|
[('is_access_via_link_hidden', '=', False)],
|
|
[('access_via_link', 'in', ('edit', 'view'))],
|
|
# public user cannot access a request, unless access_via_link='edit'
|
|
expression.OR([
|
|
[('access_via_link', '=', 'edit')],
|
|
[('type', '!=', 'binary')],
|
|
expression.OR([
|
|
[('attachment_id', '!=', False)],
|
|
[('shortcut_document_id.attachment_id', '!=', False)],
|
|
]),
|
|
])
|
|
])
|
|
else:
|
|
permission_domain = [('user_permission', '!=', 'none')] # needed for search in sudo
|
|
|
|
children_sudo = request.env['documents.document'].sudo().search(expression.AND([
|
|
[('folder_id', '=', folder_sudo.id)],
|
|
permission_domain,
|
|
]), order='name')
|
|
|
|
return children_sudo
|
|
|
|
@classmethod
|
|
def _from_access_token(cls, access_token, *, skip_log=False, follow_shortcut=True):
|
|
"""Get existing document with matching ``access_token``.
|
|
|
|
It returns an empty recordset when either:
|
|
* the document is not found ;
|
|
* the user cannot already access the document and the link
|
|
doesn't grant access ;
|
|
* the matching document is a shortcut but it is not allowed to
|
|
follow the shortcut.
|
|
Otherwise it returns the matching document in sudo.
|
|
|
|
A ``documents.access`` record is created/updated with the
|
|
current date unless the ``skip_log`` flag is set.
|
|
|
|
:param str access_token: the access token to the document record
|
|
:param bool skip_log: flag to prevent updating the record last
|
|
access date of internal users, useful to prevent silly
|
|
serialization errors, best used with read-only controllers.
|
|
:param bool follow_shortcut: flag to prevent returning the target
|
|
from a shortcut and instead return the shortcut itself.
|
|
"""
|
|
Doc = request.env['documents.document']
|
|
|
|
# Document record
|
|
try:
|
|
document_token, __, encoded_id = access_token.rpartition('o')
|
|
document_id = int(encoded_id, 16)
|
|
except ValueError:
|
|
return Doc
|
|
if not document_token or document_id < 1:
|
|
return Doc
|
|
document_sudo = Doc.browse(document_id).sudo()
|
|
if not document_sudo.document_token: # like exists() but prefetch
|
|
return Doc
|
|
if not request.env.user._is_internal() and not document_sudo.active:
|
|
return Doc
|
|
|
|
# Permissions
|
|
if not (
|
|
consteq(document_token, document_sudo.document_token)
|
|
and (document_sudo.user_permission != 'none'
|
|
or document_sudo.access_via_link != 'none')
|
|
):
|
|
return Doc
|
|
|
|
# Document access
|
|
skip_log = skip_log or request.env.user._is_public()
|
|
if not skip_log:
|
|
for doc_sudo in filter(bool, (document_sudo, document_sudo.shortcut_document_id)):
|
|
if access := request.env['documents.access'].sudo().search([
|
|
('partner_id', '=', request.env.user.partner_id.id),
|
|
('document_id', '=', doc_sudo.id),
|
|
]):
|
|
access.last_access_date = fields.Datetime.now()
|
|
else:
|
|
request.env['documents.access'].sudo().create([{
|
|
'document_id': doc_sudo.id,
|
|
'partner_id': request.env.user.partner_id.id,
|
|
'last_access_date': fields.Datetime.now(),
|
|
}])
|
|
|
|
# Shortcut
|
|
if follow_shortcut:
|
|
if target_sudo := document_sudo.shortcut_document_id:
|
|
if (target_sudo.user_permission != 'none'
|
|
or (target_sudo.access_via_link != 'none'
|
|
and not target_sudo.is_access_via_link_hidden)):
|
|
document_sudo = target_sudo
|
|
else:
|
|
document_sudo = Doc
|
|
|
|
# Extra validation step, to run with the target
|
|
if (
|
|
request.env.user._is_public()
|
|
and document_sudo.type == 'binary'
|
|
and not document_sudo.attachment_id
|
|
and document_sudo.access_via_link != 'edit'
|
|
):
|
|
# public cannot access a document request, unless access_via_link='edit'
|
|
return Doc
|
|
|
|
return document_sudo
|
|
|
|
def _make_zip(self, name, documents):
|
|
"""
|
|
Create a zip file in memory out of the given ``documents``,
|
|
recursively exploring the folders, get an HTTP response to
|
|
download that zip file.
|
|
|
|
:param str name: the name to give to the zip file
|
|
:param odoo.models.Model documents: documents to load in the ZIP
|
|
:return: a http response to download the zip file
|
|
"""
|
|
class Item(NamedTuple):
|
|
path: str
|
|
content: str
|
|
|
|
seen_folders = set() # because of shortcuts, we can have loops
|
|
# many documents can have the same name
|
|
seen_names = defaultdict(int)
|
|
|
|
def unique(pathname):
|
|
# files inside a zip can not have the same name
|
|
# (files in the documents application can)
|
|
seen_names[pathname] += 1
|
|
if seen_names[pathname] <= 1:
|
|
return pathname
|
|
|
|
ext = ''.join(pathlib.Path(pathname).suffixes)
|
|
return f'{pathname.removesuffix(ext)}-{seen_names[pathname]}{ext}'
|
|
|
|
def make_zip_item(document, folder):
|
|
if document.type == 'url':
|
|
raise ValueError("cannot create a zip item out of an url")
|
|
if document.type == 'folder':
|
|
# it is the ending slash that makes it appears as a
|
|
# folder inside the zip file.
|
|
return Item(unique(f'{folder.path}{document.name}') + '/', '')
|
|
try:
|
|
stream = self._documents_content_stream(document.shortcut_document_id or document)
|
|
except (ValueError, MissingError):
|
|
return None # skip
|
|
return Item(unique(f'{folder.path}{stream.download_name}'), stream.read())
|
|
|
|
def generate_zip_items(documents_sudo, folder):
|
|
documents_sudo = documents_sudo.sorted(lambda d: d.id)
|
|
|
|
yield from (
|
|
item
|
|
for doc in documents_sudo
|
|
if doc.type == 'binary' and (doc.shortcut_document_id or doc).attachment_id
|
|
if (item := make_zip_item(doc, folder)) is not None
|
|
)
|
|
for folder_sudo in documents_sudo:
|
|
if folder_sudo.type != 'folder' or folder_sudo in seen_folders:
|
|
continue
|
|
seen_folders.add(folder_sudo)
|
|
|
|
yield (sub_folder := make_zip_item(folder_sudo, folder))
|
|
for sub_document_sudo in self._get_folder_children(folder_sudo):
|
|
yield from generate_zip_items(sub_document_sudo, sub_folder)
|
|
|
|
# TODO: zip on-the-fly while streaming instead of loading the
|
|
# entire zip in memory and sending it all at once.
|
|
|
|
stream = io.BytesIO()
|
|
root_folder = Item('', '')
|
|
try:
|
|
with zipfile.ZipFile(stream, 'w') as doc_zip:
|
|
for (path, content) in generate_zip_items(documents, root_folder):
|
|
doc_zip.writestr(path, content, compress_type=zipfile.ZIP_DEFLATED)
|
|
except zipfile.BadZipfile:
|
|
logger.exception("BadZipfile exception")
|
|
|
|
content = stream.getvalue()
|
|
headers = [
|
|
('Content-Type', 'zip'),
|
|
('X-Content-Type-Options', 'nosniff'),
|
|
('Content-Length', len(content)),
|
|
('Content-Disposition', content_disposition(name))
|
|
]
|
|
return request.make_response(content, headers)
|
|
|
|
# Download & upload routes #####################################################################
|
|
@http.route('/documents/pdf_split', type='http', methods=['POST'], auth="user")
|
|
def pdf_split(self, new_files=None, ufile=None, archive=False, vals=None):
|
|
"""Used to split and/or merge pdf documents.
|
|
|
|
The data can come from different sources: multiple existing documents
|
|
(at least one must be provided) and any number of extra uploaded files.
|
|
|
|
:param new_files: the array that represents the new pdf structure:
|
|
[{
|
|
'name': 'New File Name',
|
|
'new_pages': [{
|
|
'old_file_type': 'document' or 'file',
|
|
'old_file_index': document_id or index in ufile,
|
|
'old_page_number': 5,
|
|
}],
|
|
}]
|
|
:param ufile: extra uploaded files that are not existing documents
|
|
:param archive: whether to archive the original documents
|
|
:param vals: values for the create of the new documents.
|
|
"""
|
|
vals = json.loads(vals)
|
|
new_files = json.loads(new_files)
|
|
# find original documents
|
|
document_ids = set()
|
|
for new_file in new_files:
|
|
for page in new_file['new_pages']:
|
|
if page['old_file_type'] == 'document':
|
|
document_ids.add(page['old_file_index'])
|
|
documents = request.env['documents.document'].browse(document_ids)
|
|
|
|
with ExitStack() as stack:
|
|
files = request.httprequest.files.getlist('ufile')
|
|
open_files = [stack.enter_context(io.BytesIO(file.read())) for file in files]
|
|
|
|
# merge together data from existing documents and from extra uploads
|
|
document_id_index_map = {}
|
|
current_index = len(open_files)
|
|
for document in documents:
|
|
open_files.append(stack.enter_context(io.BytesIO(base64.b64decode(document.datas))))
|
|
document_id_index_map[document.id] = current_index
|
|
current_index += 1
|
|
|
|
# update new_files structure with the new indices from documents
|
|
for new_file in new_files:
|
|
for page in new_file['new_pages']:
|
|
if page.pop('old_file_type') == 'document':
|
|
page['old_file_index'] = document_id_index_map[page['old_file_index']]
|
|
|
|
# apply the split/merge
|
|
new_documents = documents._pdf_split(new_files=new_files, open_files=open_files, vals=vals)
|
|
|
|
# archive original documents if needed
|
|
if archive == 'true':
|
|
documents.write({'active': False})
|
|
|
|
response = request.make_response(json.dumps(new_documents.ids), [('Content-Type', 'application/json')])
|
|
return response
|
|
|
|
@http.route('/documents/<access_token>', type='http', auth='public')
|
|
def documents_home(self, access_token):
|
|
document_sudo = self._from_access_token(access_token)
|
|
|
|
if not document_sudo:
|
|
Redirect = request.env['documents.redirect'].sudo()
|
|
if document_sudo := Redirect._get_redirection(access_token):
|
|
return request.redirect(
|
|
f'/documents/{document_sudo.access_token}',
|
|
HTTPStatus.MOVED_PERMANENTLY,
|
|
)
|
|
|
|
if request.env.user._is_public():
|
|
return self._documents_render_public_view(document_sudo)
|
|
elif request.env.user._is_portal():
|
|
return self._documents_render_portal_view(document_sudo)
|
|
else: # assume internal user
|
|
# Internal users use the /odoo/documents/<access_token> route
|
|
return request.redirect(
|
|
f'/odoo/documents/{access_token}',
|
|
HTTPStatus.TEMPORARY_REDIRECT,
|
|
)
|
|
|
|
def _documents_render_public_view(self, document_sudo):
|
|
target_sudo = document_sudo.shortcut_document_id
|
|
if (
|
|
target_sudo
|
|
and target_sudo.access_via_link != 'none'
|
|
and not target_sudo.is_access_via_link_hidden
|
|
):
|
|
return request.redirect(f'/odoo/documents/{target_sudo.access_token}')
|
|
if target_sudo or not document_sudo:
|
|
return request.render(
|
|
'documents.not_available', {'document': document_sudo}, status=404)
|
|
if document_sudo.type == 'url':
|
|
return request.redirect(
|
|
document_sudo.url, code=HTTPStatus.TEMPORARY_REDIRECT, local=False)
|
|
if document_sudo.type == 'binary' and document_sudo.attachment_id:
|
|
return request.render('documents.share_file', {'document': document_sudo})
|
|
if document_sudo.type == 'binary':
|
|
return request.render('documents.document_request_page', {'document': document_sudo})
|
|
if document_sudo.type == 'folder':
|
|
sub_documents_sudo = ShareRoute._get_folder_children(document_sudo)
|
|
return request.render('documents.public_folder_page', {
|
|
'folder': document_sudo,
|
|
'documents': sub_documents_sudo,
|
|
'subfolders': {
|
|
sub_folder_sudo.id: ShareRoute._get_folder_children(sub_folder_sudo)
|
|
for sub_folder_sudo in sub_documents_sudo
|
|
if sub_folder_sudo.type == 'folder'
|
|
}
|
|
})
|
|
else:
|
|
e = f"unknown document type {document_sudo.type}"
|
|
raise NotImplementedError(e)
|
|
|
|
def _documents_render_portal_view(self, document):
|
|
""" Render the portal version (stripped version of the backend Documents app). """
|
|
# We build the session information necessary for the web client to load
|
|
session_info = request.env['ir.http'].session_info()
|
|
mods = conf.server_wide_modules or []
|
|
lang = request.env.context.get('lang')
|
|
cache_hashes = {
|
|
"translations": request.env['ir.http'].get_web_translations_hash(mods, lang),
|
|
}
|
|
|
|
session_info.update(
|
|
cache_hashes=cache_hashes,
|
|
user_companies={
|
|
'current_company': request.env.company.id,
|
|
'allowed_companies': {
|
|
request.env.company.id: {
|
|
'id': request.env.company.id,
|
|
'name': request.env.company.name,
|
|
},
|
|
},
|
|
},
|
|
documents_init=self._documents_get_init_data(document, request.env.user),
|
|
)
|
|
|
|
return request.render(
|
|
'documents.document_portal_view',
|
|
{'session_info': session_info},
|
|
)
|
|
|
|
@classmethod
|
|
def _documents_get_init_data(cls, document, user):
|
|
""" Get initialization data to restore the interface on the selected document. """
|
|
if not document or not user:
|
|
return {}
|
|
|
|
document.ensure_one()
|
|
documents_init = {}
|
|
|
|
# If the user does not have access to the parent folder, we open it in the "SHARED" folder.
|
|
if document.type != 'folder':
|
|
parent = document.folder_id
|
|
shared_root = False if user.share else "SHARED" # Portal don't have 'Shared with me'
|
|
if parent:
|
|
documents_init['folder_id'] = parent.id if parent.user_permission in {'view', 'edit'} else shared_root
|
|
else:
|
|
documents_init['folder_id'] = (
|
|
"MY" if document.owner_id == user
|
|
else "COMPANY" if not user.share and (
|
|
document.owner_id == document.env.ref('base.user_root') or document.access_internal != 'none')
|
|
else shared_root
|
|
)
|
|
documents_init['document_id'] = document.id
|
|
target = document.shortcut_document_id or document
|
|
if document.type == 'binary' and target.attachment_id:
|
|
documents_init['open_preview'] = True
|
|
else:
|
|
documents_init['folder_id'] = document.id
|
|
|
|
return documents_init
|
|
|
|
@http.route('/documents/avatar/<access_token>',
|
|
type='http', auth='public', readonly=True)
|
|
def documents_avatar(self, access_token):
|
|
"""Show the avatar of the document's owner, or the avatar placeholder.
|
|
|
|
:param access_token: the access token to the document record
|
|
"""
|
|
partner_sudo = self._from_access_token(access_token, skip_log=True).owner_id.partner_id
|
|
return request.env['ir.binary']._get_image_stream_from(
|
|
partner_sudo, 'avatar_128', placeholder=partner_sudo._avatar_get_placeholder_path()
|
|
).get_response(as_attachment=False)
|
|
|
|
@http.route('/documents/content/<access_token>',
|
|
type='http', auth='public', readonly=True)
|
|
def documents_content(self, access_token, download=True):
|
|
"""Serve the file of the document.
|
|
|
|
:param access_token: the access token to the document record
|
|
:param download: whether to download the document on the user's
|
|
file system or to preview the document within the browser
|
|
"""
|
|
document_sudo = self._from_access_token(access_token, skip_log=True)
|
|
if not document_sudo:
|
|
raise request.not_found()
|
|
if document_sudo.type == 'url':
|
|
return request.redirect(
|
|
document_sudo.url, code=HTTPStatus.TEMPORARY_REDIRECT, local=False)
|
|
if document_sudo.type == 'folder':
|
|
return self._make_zip(
|
|
f'{document_sudo.name}.zip',
|
|
self._get_folder_children(document_sudo),
|
|
)
|
|
if document_sudo.type == 'binary':
|
|
if not document_sudo.attachment_id:
|
|
raise request.not_found()
|
|
with replace_exceptions(ValueError, by=BadRequest):
|
|
download = str2bool(download)
|
|
with replace_exceptions(ValueError, MissingError, by=request.not_found()):
|
|
stream = self._documents_content_stream(document_sudo)
|
|
return stream.get_response(as_attachment=download)
|
|
e = f"unknown document type {document_sudo.type!r}"
|
|
raise NotImplementedError(e)
|
|
|
|
def _documents_content_stream(self, document_sudo):
|
|
return request.env['ir.binary']._get_stream_from(document_sudo)
|
|
|
|
@http.route('/documents/redirect/<access_token>', type='http', auth='public', readonly=True)
|
|
def documents_redirect(self, access_token):
|
|
return request.redirect(f'/odoo/documents/{access_token}', HTTPStatus.MOVED_PERMANENTLY)
|
|
|
|
@http.route('/documents/touch/<access_token>', type='json', auth='user')
|
|
def documents_touch(self, access_token):
|
|
self._from_access_token(access_token)
|
|
return {}
|
|
|
|
@http.route(['/documents/thumbnail/<access_token>',
|
|
'/documents/thumbnail/<access_token>/<int:width>x<int:height>'],
|
|
type='http', auth='public', readonly=True)
|
|
def documents_thumbnail(self, access_token, width='0', height='0', unique=''):
|
|
"""Show the thumbnail of the document, or a placeholder.
|
|
|
|
:param access_token: the access token to the document record
|
|
:param width: resize the thumbnail to this maximum width
|
|
:param height: resize the thumbnail to this maximum height
|
|
:param unique: force storing the file in the browser cache, best
|
|
used with the checksum of the attachment
|
|
"""
|
|
with replace_exceptions(ValueError, by=BadRequest):
|
|
width = int(width)
|
|
height = int(height)
|
|
send_file_kwargs = {}
|
|
if unique:
|
|
send_file_kwargs['immutable'] = True
|
|
send_file_kwargs['max_age'] = http.STATIC_CACHE_LONG
|
|
document_sudo = self._from_access_token(access_token, skip_log=True)
|
|
return request.env['ir.binary']._get_image_stream_from(
|
|
document_sudo, 'thumbnail', width=width, height=height
|
|
).get_response(as_attachment=False, **send_file_kwargs)
|
|
|
|
@http.route(['/documents/document/<int:document_id>/update_thumbnail'], type='json', auth='user')
|
|
def documents_update_thumbnail(self, document_id, thumbnail):
|
|
"""Update the thumbnail of the document (after it has been generated by the browser).
|
|
|
|
We update the thumbnail in SUDO, after checking the read access, so it will work
|
|
if the user that generates the thumbnail is not the user who uploaded the document.
|
|
"""
|
|
document = request.env['documents.document'].browse(document_id)
|
|
document.check_access('read')
|
|
if document.thumbnail_status != 'client_generated':
|
|
return
|
|
document.sudo().write({
|
|
'thumbnail': thumbnail,
|
|
'thumbnail_status': 'present' if thumbnail else 'error',
|
|
})
|
|
|
|
@http.route(['/documents/zip'], type='http', auth='user')
|
|
def documents_zip(self, file_ids, zip_name, **kw):
|
|
"""Select many files / folders in the interface and click on download.
|
|
|
|
:param file_ids: if of the files to zip.
|
|
:param zip_name: name of the zip file.
|
|
"""
|
|
ids_list = [int(x) for x in file_ids.split(',')]
|
|
documents = request.env['documents.document'].browse(ids_list)
|
|
documents.check_access('read')
|
|
return self._make_zip(zip_name, documents)
|
|
|
|
@http.route([
|
|
'/document/download/all/<int:share_id>/<access_token>',
|
|
'/document/download/all/<access_token>'], type='http', auth='public')
|
|
def documents_download_all_legacy(self, access_token=None, share_id=None):
|
|
logger.warning("Deprecated since Odoo 18. Please access /documents/content/<access_token> instead.")
|
|
return request.redirect(f'/documents/content/{access_token}', HTTPStatus.MOVED_PERMANENTLY)
|
|
|
|
@http.route([
|
|
'/document/share/<int:share_id>/<token>',
|
|
'/document/share/<token>'], type='http', auth='public')
|
|
def share_portal(self, share_id=None, token=None):
|
|
logger.warning("Deprecated since Odoo 18. Please access /odoo/documents/<access_token> instead.")
|
|
return request.redirect(f'/odoo/documents/{token}', code=HTTPStatus.MOVED_PERMANENTLY)
|
|
|
|
@http.route(['/documents/upload/', '/documents/upload/<access_token>'],
|
|
type='http', auth='public', methods=['POST'],
|
|
max_content_length=_max_content_length)
|
|
def documents_upload(
|
|
self,
|
|
ufile,
|
|
access_token='',
|
|
owner_id='',
|
|
partner_id='',
|
|
res_id='',
|
|
res_model='',
|
|
):
|
|
"""
|
|
Replace an existing document or create new ones.
|
|
|
|
:param ufile: a list of multipart/form-data files.
|
|
:param access_token: the access token to a folder in which to
|
|
create new documents, or the access token to an existing
|
|
document where to upload/replace its attachment.
|
|
A falsy value means no folder_id and is allowed for
|
|
internal users to upload at the root of "My Drive".
|
|
:param owner_id, partner_id, res_id, res_model: field values
|
|
when creating new documents, for internal users only
|
|
"""
|
|
is_internal_user = request.env.user._is_internal()
|
|
if is_internal_user and not access_token:
|
|
document_sudo = request.env['documents.document'].sudo()
|
|
else:
|
|
document_sudo = self._from_access_token(access_token)
|
|
if (
|
|
not document_sudo
|
|
or (document_sudo.user_permission != 'edit'
|
|
and document_sudo.access_via_link != 'edit')
|
|
or document_sudo.type not in ('binary', 'folder')
|
|
):
|
|
raise request.not_found()
|
|
|
|
files = request.httprequest.files.getlist('ufile')
|
|
if not files:
|
|
raise BadRequest("missing files")
|
|
if len(files) > 1 and document_sudo.type not in (False, 'folder'):
|
|
raise BadRequest("cannot save multiple files inside a single document")
|
|
|
|
if is_internal_user:
|
|
with replace_exceptions(ValueError, by=BadRequest):
|
|
owner_id = int(owner_id) if owner_id else request.env.user.id
|
|
partner_id = int(partner_id) if partner_id else False
|
|
res_model = res_model or 'documents.document'
|
|
res_id = int(res_id) if res_id else False
|
|
elif owner_id or partner_id or res_id or res_model:
|
|
raise Forbidden("only internal users can provide field values")
|
|
else:
|
|
owner_id = document_sudo.owner_id.id if request.env.user.is_public else request.env.user.id
|
|
partner_id = False
|
|
res_model = 'documents.document'
|
|
res_id = False # replaced by the document's id
|
|
|
|
document_ids = self._documents_upload(
|
|
document_sudo, files, owner_id, partner_id, res_id, res_model)
|
|
if len(document_ids) == 1:
|
|
document_sudo = document_sudo.browse(document_ids)
|
|
|
|
if request.env.user._is_public():
|
|
return request.redirect(document_sudo.access_url)
|
|
else:
|
|
return request.make_json_response(document_ids)
|
|
|
|
def _documents_upload(self,
|
|
document_sudo, files, owner_id, partner_id, res_id, res_model):
|
|
""" Replace an existing document or upload a new one. """
|
|
is_internal_user = request.env.user._is_internal()
|
|
|
|
document_ids = []
|
|
AttachmentSudo = request.env['ir.attachment'].sudo(not is_internal_user)
|
|
|
|
if document_sudo.type == 'binary':
|
|
attachment_sudo = AttachmentSudo._from_request_file(
|
|
files[0], mimetype='TRUST' if is_internal_user else 'GUESS'
|
|
)
|
|
attachment_sudo.res_model = document_sudo.res_model
|
|
attachment_sudo.res_id = document_sudo.res_id
|
|
values = {'attachment_id': attachment_sudo.id}
|
|
if not document_sudo.attachment_id: # is a request
|
|
if document_sudo.access_via_link == 'edit':
|
|
values['access_via_link'] = 'view'
|
|
self._documents_upload_create_write(document_sudo, values)
|
|
document_ids.append(document_sudo.id)
|
|
else:
|
|
folder_sudo = document_sudo
|
|
for file in files:
|
|
document_sudo = self._documents_upload_create_write(folder_sudo, {
|
|
'attachment_id': AttachmentSudo._from_request_file(
|
|
file, mimetype='TRUST' if is_internal_user else 'GUESS'
|
|
).id,
|
|
'type': 'binary',
|
|
'access_via_link': 'none' if folder_sudo.access_via_link in (False, 'none') else 'view',
|
|
'folder_id': folder_sudo.id,
|
|
'owner_id': owner_id,
|
|
'partner_id': partner_id,
|
|
'res_model': res_model,
|
|
'res_id': res_id,
|
|
})
|
|
document_ids.append(document_sudo.id)
|
|
if folder_sudo.create_activity_option:
|
|
folder_sudo.browse(document_ids).documents_set_activity(
|
|
settings_record=folder_sudo)
|
|
|
|
return document_ids
|
|
|
|
def _documents_upload_create_write(self, document_sudo, vals):
|
|
"""
|
|
The actual function that either write vals on a binary document
|
|
or create a new document with vals inside a folder document.
|
|
"""
|
|
if document_sudo.type == 'binary':
|
|
document_sudo.write(vals)
|
|
else:
|
|
vals.setdefault('folder_id', document_sudo.id)
|
|
document_sudo = document_sudo.create(vals)
|
|
if not document_sudo.res_model:
|
|
document_sudo.res_model = 'documents.document'
|
|
if (
|
|
document_sudo.res_model == 'documents.document'
|
|
and not document_sudo.res_id
|
|
):
|
|
document_sudo.res_id = document_sudo.id
|
|
if (any(field_name in vals for field_name in [
|
|
'raw', 'datas', 'attachment_id'])):
|
|
document_sudo.message_post(body=_(
|
|
"Document uploaded by %(user)s",
|
|
user=request.env.user.name
|
|
))
|
|
|
|
return document_sudo
|
|
|
|
@http.route('/documents/upload_traceback', type='http', methods=['POST'], auth='user')
|
|
def documents_upload_traceback(self, ufile, max_content_length=1 << 20): # 1MiB
|
|
if not request.env.user._is_internal():
|
|
raise Forbidden()
|
|
|
|
folder_sudo = request.env.ref(
|
|
'documents.document_support_folder',
|
|
raise_if_not_found=False
|
|
).sudo()
|
|
if not folder_sudo or not folder_sudo.active:
|
|
raise request.not_found()
|
|
|
|
files = request.httprequest.files.getlist('ufile')
|
|
if not files:
|
|
raise BadRequest("missing files")
|
|
if len(files) > 1:
|
|
raise BadRequest("This route only accepts one file at a time.")
|
|
|
|
traceback_sudo = self._documents_upload_create_write(folder_sudo, {
|
|
'attachment_id': request.env['ir.attachment']._from_request_file(
|
|
files[0], mimetype='text/plain').id,
|
|
'type': 'binary',
|
|
'access_internal': 'none',
|
|
'access_via_link': 'view',
|
|
'folder_id': folder_sudo.id,
|
|
'owner_id': request.env.ref('base.user_root').id,
|
|
})
|
|
|
|
return request.make_json_response([traceback_sudo.access_url])
|
|
|
|
|
|
class DocumentsAttachmentController(AttachmentController):
|
|
|
|
@http.route()
|
|
def mail_attachment_upload(self, *args, **kw):
|
|
""" Override to prevent the creation of a document when uploading
|
|
an attachment from an activity already linked to a document."""
|
|
if kw.get('activity_id'):
|
|
document = request.env['documents.document'].search([('request_activity_id', '=', int(kw['activity_id']))])
|
|
if document:
|
|
request.update_context(no_document=True)
|
|
return super().mail_attachment_upload(*args, **kw)
|