import zipfile from base64 import b64decode, b64encode from datetime import timedelta from http import HTTPStatus from io import BytesIO from urllib.parse import urlencode from PIL import Image from freezegun import freeze_time from urllib3.util import parse_url from odoo import Command, fields, http from odoo.tests.common import RecordCapturer from odoo.tools import file_open from odoo.tools.image import image_process from odoo.addons.base.tests.common import HttpCaseWithUserDemo from odoo.addons.mail.tests.common import mail_new_test_user from odoo.addons.documents.controllers.documents import ShareRoute class TestDocumentsControllers(HttpCaseWithUserDemo): def _assertPathEqual(self, first, second): self.assertEqual(parse_url(first).path, parse_url(second).path) def _assertPathIn(self, member, container): self.assertIn(parse_url(member).path, {parse_url(expected_url).path for expected_url in container}) @classmethod def setUpClass(cls): super().setUpClass() cls.user_portal = mail_new_test_user(cls.env, login='portal_test', groups='base.group_portal', company_id=cls.env.ref('base.main_company').id, name='portal', notification_type='email' ) cls.user_manager = mail_new_test_user(cls.env, login='manager_test', groups='documents.group_documents_manager', company_id=cls.env.ref('base.main_company').id, name='manager', notification_type='email' ) # make sure the admin user has an avatar with file_open('base/static/img/partner_root-image.png', 'rb') as file: cls.admin_avatar = file.read() cls.admin_avatar_b64 = b64encode(cls.admin_avatar) cls.user_admin.image_1920 = cls.admin_avatar_b64 # use the Document app icon as test file with file_open('documents/static/description/icon.png', 'rb') as file: cls.doc_icon = file.read() cls.doc_icon_b64 = b64encode(cls.doc_icon) Doc = cls.env['documents.document'] cls.test_activity_type = cls.env['mail.activity.type'].create({ 'name': 'Test Activity Type' }) cls.internal_folder = Doc.create({ 'type': 'folder', 'name': "internal folder", 'access_internal': 'edit', 'access_via_link': 'none', 'owner_id': cls.user_admin.id, 'create_activity_option': False, 'create_activity_type_id': cls.test_activity_type.id, 'create_activity_summary': 'test summary', 'create_activity_note': 'test note', 'create_activity_user_id': cls.user_admin.id, 'create_activity_date_deadline_range_type': 'days', 'create_activity_date_deadline_range': 5, }) cls.internal_file = Doc.create({ 'type': 'binary', 'name': "internal-file.png", 'access_internal': 'edit', 'access_via_link': 'none', 'is_access_via_link_hidden': True, 'owner_id': cls.user_admin.id, 'folder_id': cls.internal_folder.id, 'raw': cls.doc_icon, }) cls.internal_hidden = Doc.create({ 'type': 'binary', 'name': "internal-hidden.png", 'access_internal': 'none', 'access_via_link': 'none', 'owner_id': cls.user_admin.id, 'folder_id': cls.internal_folder.id, 'raw': cls.doc_icon, }) cls.internal_request = Doc.create({ 'type': 'binary', 'name': "internal-request.png", 'access_internal': 'edit', 'access_via_link': 'none', 'owner_id': cls.user_admin.id, 'folder_id': cls.internal_folder.id, }) cls.internal_url = Doc.create({ 'type': 'url', 'name': "internal url", 'access_internal': 'edit', 'access_via_link': 'none', 'owner_id': cls.user_admin.id, 'folder_id': cls.internal_folder.id, 'url': f'{cls.base_url()}/web/health', }) cls.public_folder = Doc.create({ 'type': 'folder', 'name': "public folder", 'access_internal': 'edit', 'access_via_link': 'edit', 'folder_id': cls.internal_folder.id, 'owner_id': cls.user_admin.id, }) cls.public_file = Doc.create({ 'type': 'binary', 'name': "public-file.png", 'access_internal': 'edit', 'access_via_link': 'view', 'owner_id': cls.user_admin.id, 'folder_id': cls.public_folder.id, 'raw': cls.doc_icon, }) cls.public_request = Doc.create({ 'type': 'binary', 'name': "public-request.png", 'access_internal': 'edit', 'access_via_link': 'edit', 'owner_id': cls.user_admin.id, 'folder_id': cls.public_folder.id, }) cls.public_url = Doc.create({ 'type': 'url', 'name': "public url", 'access_internal': 'edit', 'access_via_link': 'view', 'owner_id': cls.user_admin.id, 'folder_id': cls.public_folder.id, 'url': f'{cls.base_url()}/web/health', }) cls.public_shortcut = cls.internal_file.action_create_shortcut(cls.public_folder.id) cls.missing_file = Doc.new() # Make so the demo and portal users already visited all # documents, so that it doesn't attempt to create the # documents.access record inside a read-only controller. # It also makes so the portal user can list those files. now = fields.Datetime.now() cls.env['documents.access'].create([ { 'document_id': doc.id, 'partner_id': partner.id, 'last_access_date': now } for doc in [ cls.internal_folder, cls.internal_file, cls.internal_hidden, cls.internal_request, cls.internal_url, cls.public_folder, cls.public_file, cls.public_request, cls.public_url, cls.public_shortcut, ] for partner in [ cls.user_demo.partner_id, cls.user_portal.partner_id, ] ]) def test_doc_ctrl_avatar(self): avatar_128 = b64decode(self.user_admin.avatar_128) placeholder = image_process( self.user_admin.partner_id._avatar_get_placeholder(), size=(128, 128), ) for document, user, status, content, filename in [ ( self.missing_file, None, 200, placeholder, 'avatar_grey.png'), ( self.public_file, None, 200, avatar_128, '"Mitchell Admin.png"'), ( self.internal_file, None, 200, placeholder, 'avatar_grey.png'), ( self.public_shortcut, None, 200, placeholder, 'avatar_grey.png'), ( self.public_shortcut, 'demo', 200, avatar_128, '"Mitchell Admin.png"'), ( self.internal_file, 'demo', 200, avatar_128, '"Mitchell Admin.png"'), # keep it last the response is reused outside the loop ]: url = f'/documents/avatar/{document.access_token}' session = self.authenticate(user, user) with self.subTest(document=document.name, user=user): res = self.url_open(url) self.assertEqual(res.status_code, status) if status == 200: self.assertEqual( res.headers.get('Content-Disposition'), f'inline; filename={filename}') self.assertEqual(res.content, content) self.assertIn('Last-Modified', res.headers) self.assertIn('ETag', res.headers) # reuse the last response's ETag/Last-Modified assert session.uid == self.user_demo.id assert document is self.internal_file res = self.url_open(url, headers={ 'If-Modified-Since': res.headers['Last-Modified'], 'If-None-Match': res.headers['ETag'], }) res.raise_for_status() self.assertEqual(res.status_code, HTTPStatus.NOT_MODIFIED) def test_doc_ctrl_avatar_portal(self): placeholder = image_process( self.user_admin.partner_id._avatar_get_placeholder(), size=(128, 128), ) self.authenticate('portal_test', 'portal_test') access_portal = self.internal_file.access_ids.filtered( lambda access: access.partner_id == self.user_portal.partner_id ).ensure_one() access_portal.role = 'view' access_portal.expiration_date = fields.Datetime.now() + timedelta(hours=1) res = self.url_open(f'/documents/avatar/{self.internal_file.access_token}') res.raise_for_status() self.assertEqual(res.content, b64decode(self.user_admin.avatar_128)) access_portal.expiration_date = fields.Datetime.now() - timedelta(hours=1) res = self.url_open(f'/documents/avatar/{self.internal_file.access_token}') res.raise_for_status() self.assertEqual(res.content, placeholder) def test_doc_ctrl_avatar_shortcut(self): self.internal_file.action_update_access_rights( access_via_link='view', is_access_via_link_hidden=False, ) for user in [None, 'demo']: with self.subTest(user=user): res = self.url_open(f'/documents/avatar/{self.public_shortcut.access_token}') res.raise_for_status() self.assertEqual(res.content, b64decode(self.user_admin.avatar_128)) def test_doc_ctrl_content_binary(self): for document, user, dl, status, content in [ ( self.missing_file, None, '1', 404, "not found"), # no document ( self.internal_file, None, '1', 404, "not found"), # access_via_link='none' ( self.public_file, None, '1', 200, self.doc_icon), # access_via_link='view' ( self.public_file, None, '0', 200, self.doc_icon), # access_via_link='view' ( self.public_file, None, 'bad', 400, "Use 0/1"), # int('bad') ( self.public_request, None, '1', 404, "not found"), # no attachment_id ( self.public_shortcut, None, '1', 404, "not found"), # hidden shortcut ( self.internal_file, 'demo', '1', 200, self.doc_icon), # access_internal='view' ( self.internal_hidden, 'demo', '1', 404, "not found"), # access_internal='none' ( self.internal_file, 'demo', '0', 200, self.doc_icon), # access_internal='view' # keep it last, the response is reused outside the loop ]: session = self.authenticate(user, user) url = f'/documents/content/{document.access_token}?download={dl}' with self.subTest(user=user, url=url): res = self.url_open(url) self.assertEqual(res.status_code, status) if status == 200: self.assertEqual(res.content, content) self.assertIn('Last-Modified', res.headers) self.assertIn('ETag', res.headers) self.assertEqual( res.headers.get('Content-Disposition'), ("attachment" if dl == "1" else "inline") + f'; filename={document.name}' ) else: self.assertIn(content, res.text) # reuse the last response's ETag/Last-Modified assert session.uid == self.user_demo.id assert url == f'/documents/content/{self.internal_file.access_token}?download=0' res = self.url_open(url, headers={ 'If-Modified-Since': res.headers['Last-Modified'], 'If-None-Match': res.headers['ETag'], }) res.raise_for_status() self.assertEqual(res.status_code, HTTPStatus.NOT_MODIFIED) # 304 def test_doc_ctrl_content_binary_portal(self): self.authenticate('portal_test', 'portal_test') access_portal = self.internal_file.access_ids.filtered( lambda access: access.partner_id == self.user_portal.partner_id ).ensure_one() access_portal.role = 'view' access_portal.expiration_date = fields.Datetime.now() + timedelta(hours=1) res = self.url_open(f'/documents/content/{self.internal_file.access_token}') res.raise_for_status() self.assertEqual(res.content, self.doc_icon) access_portal.expiration_date = fields.Datetime.now() - timedelta(hours=1) res = self.url_open(f'/documents/content/{self.internal_file.access_token}') self.assertEqual(res.status_code, 404) def test_doc_ctrl_content_binary_shortcut(self): self.authenticate('demo', 'demo') res = self.url_open(f'/documents/content/{self.public_shortcut.access_token}') res.raise_for_status() self.assertEqual(res.content, self.doc_icon) # make so the public user can follow the shortcut self.internal_file.action_update_access_rights( access_via_link='view', is_access_via_link_hidden=False, ) self.authenticate(None, None) res = self.url_open(f'/documents/content/{self.public_shortcut.access_token}') res.raise_for_status() self.assertEqual(res.content, self.doc_icon) def test_doc_ctrl_content_folder(self): self.authenticate(None, None) res = self.url_open(f'/documents/content/{self.internal_folder.access_token}') self.assertEqual(res.status_code, 404) res = self.url_open(f'/documents/content/{self.public_folder.access_token}') res.raise_for_status() self.assertEqual(res.status_code, 200) with zipfile.ZipFile(BytesIO(res.content)) as reszip: self.assertEqual(reszip.namelist(), ['public-file.png']) self.assertEqual(reszip.read('public-file.png'), self.doc_icon) self.internal_file.action_update_access_rights( access_via_link='view', is_access_via_link_hidden=False, ) res = self.url_open(f'/documents/content/{self.public_folder.access_token}') res.raise_for_status() self.assertEqual(res.status_code, 200) with zipfile.ZipFile(BytesIO(res.content)) as reszip: self.assertEqual(sorted(reszip.namelist()), ['internal-file.png', 'public-file.png']) self.assertEqual(reszip.read('internal-file.png'), self.doc_icon) self.assertEqual(reszip.read('public-file.png'), self.doc_icon) # check that the name are all unique self.public_file.action_create_shortcut(self.internal_folder.id) self.public_folder.action_create_shortcut(self.internal_folder.id) self.env['documents.document'].create([{ 'name': 'test.tar.gz', 'folder_id': self.internal_folder.id, 'access_internal': 'view', 'datas': 'test', } for _ in range(3)] + [{ 'name': self.public_folder.name, 'datas': 'test', 'folder_id': self.public_folder.folder_id.id, 'access_internal': 'view', 'type': 'folder', } for _ in range(3)] + [{ 'name': '.hidden', 'datas': 'test', 'folder_id': self.public_folder.folder_id.id, 'access_internal': 'view', 'type': 'folder', } for _ in range(2)]) self.authenticate('demo', 'demo') res = self.url_open(f'/documents/content/{self.internal_folder.access_token}') res.raise_for_status() self.assertEqual(res.status_code, 200) expected = { 'internal-file.png', 'public-file.png', 'public folder/', # already discovered, but it's a shortcut to a file so it's ok 'public folder/public-file.png', 'public folder/internal-file.png', 'public folder-2/', 'public folder-3/', 'public folder-4/', 'public folder-5/', 'test.tar.gz', 'test-2.tar.gz', 'test-3.tar.gz', '.hidden/', '.hidden-2/', } with zipfile.ZipFile(BytesIO(res.content)) as reszip: self.assertEqual(set(reszip.namelist()), expected) self.assertEqual(reszip.read('internal-file.png'), self.doc_icon) def test_doc_ctrl_content_url(self): self.authenticate(None, None) res = self.url_open(f'/documents/content/{self.public_url.access_token}', allow_redirects=False) res.raise_for_status() self.assertEqual(res.status_code, HTTPStatus.TEMPORARY_REDIRECT) # 307 self.assertEqual(res.headers.get('Location'), self.public_url.url) res = self.url_open(f'/documents/content/{self.internal_url.access_token}', allow_redirects=False) self.assertEqual(res.status_code, 404) self.authenticate('demo', 'demo') res = self.url_open(f'/documents/content/{self.internal_url.access_token}', allow_redirects=False) res.raise_for_status() self.assertEqual(res.status_code, HTTPStatus.TEMPORARY_REDIRECT) # 307 self.assertEqual(res.headers.get('Location'), self.internal_url.url) def test_doc_ctrl_cross_redirection(self): docs_url = f'/documents/{self.public_file.access_token}' odoo_url = '/odoo' + docs_url portal = self.user_portal.login demo = self.user_demo.login for login, url, code, location in [ ( None, odoo_url, 307, docs_url), ( portal, odoo_url, 307, docs_url), ( demo, docs_url, 307, odoo_url), ( None, docs_url, 200, ...), ( portal, docs_url, 200, ...), ( demo, odoo_url, 303, ...), ]: with self.subTest(login=login, url=url): self.authenticate(login, login) res = self.url_open(url, allow_redirects=False) res.raise_for_status() self.assertEqual(res.status_code, code) if code == 307: self.assertURLEqual(res.headers.get('Location'), location) def test_doc_render_public_templates(self): self.authenticate(None, None) # Internal documents for doc in ( self.internal_file, self.internal_hidden, self.internal_folder, self.internal_request, self.internal_url, ): with self.subTest(name=doc.name): res = self.url_open(doc.access_url) self.assertEqual(res.status_code, 404) self.assertIn("does not exist or is not publicly available.", res.text) # URL res = self.url_open(self.public_url.access_url, allow_redirects=False) res.raise_for_status() self.assertEqual(res.status_code, HTTPStatus.TEMPORARY_REDIRECT) # Folder res = self.url_open(self.public_folder.access_url) res.raise_for_status() self.assertRegex(res.text, r"0\s+folders,\s+1\s+files") self.assertIn(self.public_file.name, res.text) self.assertIn(self.public_request.name, res.text) self.assertIn(self.public_url.name, res.text) # Folder with visible shortcut self.internal_file.action_update_access_rights( access_via_link='view', is_access_via_link_hidden=False, ) res = self.url_open(self.public_folder.access_url) res.raise_for_status() self.assertRegex(res.text, r"0\s+folders,\s+2\s+files") self.assertIn(self.public_file.name, res.text) self.assertIn(self.public_request.name, res.text) self.assertIn(self.public_url.name, res.text) self.assertIn(self.internal_file.name, res.text) # File res = self.url_open(self.public_file.access_url) res.raise_for_status() self.assertIn(self.public_file.name, res.text) self.assertIn("Download file", res.text) self.assertIn("Preview file", res.text) # Request res = self.url_open(self.public_request.access_url) res.raise_for_status() self.assertIn("This document has been requested.", res.text) def test_doc_ctrl_thumbnail(self): placeholder = self.env['ir.binary']._placeholder( self.internal_file._get_placeholder_filename('thumbnail')) self.authenticate(None, None) res = self.url_open(f'/documents/thumbnail/{self.internal_file.access_token}') self.assertEqual(res.status_code, 200) self.assertEqual(res.content, placeholder) res = self.url_open(f'/documents/thumbnail/{self.public_file.access_token}') res.raise_for_status() self.assertEqual(res.status_code, 200) with (Image.open(BytesIO(self.doc_icon)) as image, Image.open(BytesIO(res.content)) as thumbnail): self.assertEqual(image.size, (100, 100)) self.assertEqual(thumbnail.size, (100, 70)) res = self.url_open(f'/documents/thumbnail/{self.public_file.access_token}?width=bad') self.assertEqual(res.status_code, 400) self.assertIn('bad', res.text) def test_doc_ctrl_upload_folder_public(self): self.authenticate(None, None) # Check errors res = self.url_open(f'/documents/upload/{self.internal_folder.access_token}', data={ 'csrf_token': http.Request.csrf_token(self), 'ufile': '', }) self.assertEqual(res.status_code, 404) res = self.url_open(f'/documents/upload/{self.public_folder.access_token}', data={ 'csrf_token': http.Request.csrf_token(self), 'ufile': '', }) self.assertEqual(res.status_code, 400) self.assertIn("missing files", res.text) res = self.url_open(f'/documents/upload/{self.public_folder.access_token}', data={ 'csrf_token': http.Request.csrf_token(self), 'res_model': 'res.users', }, files={'ufile': BytesIO()}) self.assertEqual(res.status_code, HTTPStatus.FORBIDDEN) self.assertIn("only internal users can provide field values", res.text) # Upload a text file with RecordCapturer(self.env['documents.document'], []) as capture: res = self.url_open(f'/documents/upload/{self.public_folder.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', BytesIO(b"Hello"), 'text/plain')}, allow_redirects=False) res.raise_for_status() document = capture.records.ensure_one() self.assertEqual(document.name, 'hello.txt') self.assertEqual(document.mimetype, 'text/plain') self.assertEqual(document.access_internal, 'edit') self.assertEqual(document.access_via_link, 'view') self.assertEqual(document.folder_id, self.public_folder) self.assertEqual(document.owner_id, self.public_folder.owner_id) self.assertEqual(document.raw, b"Hello") self.assertRegex(document.access_token, r'[A-Za-z0-9-_]{22}') self.assertEqual(document.message_ids.mapped('body'), [ "
Document uploaded by Public user
", "Document created
", ]) self.assertEqual(res.status_code, HTTPStatus.SEE_OTHER) # 303 self._assertPathEqual(res.headers.get('Location'), document.access_url) self.url_open(res.headers['Location']).raise_for_status() # Upload an image but forge the filename/mimetype to pretend it is text with (RecordCapturer(self.env['documents.document'], []) as record_capture, self.assertLogs('odoo.tools.mimetypes', 'WARNING') as log_capture): res = self.url_open(f'/documents/upload/{self.public_folder.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', BytesIO(self.doc_icon), 'text/plain')}, allow_redirects=False) res.raise_for_status() document = record_capture.records.ensure_one() self.assertEqual(document.name, 'hello.txt.png', "the filename must have been neutralized") self.assertEqual(document.mimetype, 'image/png', "the mimetype must have been neutralized") self.assertEqual(document.raw, self.doc_icon) self.assertEqual(log_capture.output, [ ("WARNING:odoo.tools.mimetypes:File 'hello.txt' has an " "invalid extension for mimetype 'image/png', adding '.png'") ]) self.url_open(res.headers['Location']).raise_for_status() def test_doc_ctrl_upload_request_public(self): self.authenticate(None, None) # Upload a text file res = self.url_open(f'/documents/upload/{self.public_request.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', BytesIO(b"Hello"), 'text/plain')}, allow_redirects=False) res.raise_for_status() self.assertEqual(self.public_request.name, 'hello.txt') self.assertEqual(self.public_request.mimetype, 'text/plain') self.assertEqual(self.public_request.access_internal, 'edit') self.assertEqual(self.public_request.access_via_link, 'view') self.assertEqual(self.public_request.owner_id, self.user_admin) self.assertEqual(self.public_request.raw, b"Hello") self.assertEqual(self.public_request.message_ids.mapped('body'), [ "Document uploaded by Public user
", "Document created
", ]) self.assertEqual(res.status_code, HTTPStatus.SEE_OTHER) # 303 self._assertPathEqual(res.headers.get('Location'), self.public_request.access_url) self.url_open(res.headers['Location']).raise_for_status() # Reset the request self.public_request.action_update_access_rights( access_via_link='edit', is_access_via_link_hidden=False, ) # Upload an image but forge the filename/mimetype to pretend it is text with self.assertLogs('odoo.tools.mimetypes', 'WARNING') as log_capture: res = self.url_open(f'/documents/upload/{self.public_request.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', BytesIO(self.doc_icon), 'text/plain')}, allow_redirects=False ) res.raise_for_status() self.assertEqual(self.public_request.name, 'hello.txt.png', "the filename must have been neutralized") self.assertEqual(self.public_request.mimetype, 'image/png', "the mimetype must have been neutralized") self.assertEqual(self.public_request.raw, self.doc_icon) self.assertEqual(log_capture.output, [ ("WARNING:odoo.tools.mimetypes:File 'hello.txt' has an " "invalid extension for mimetype 'image/png', adding '.png'") ]) self.url_open(res.headers['Location']).raise_for_status() @freeze_time('2022-07-24 08:00:00') def test_doc_upload_folder_user(self): self.authenticate('demo', 'demo') # Errors res = self.url_open(f'/documents/upload/{self.internal_folder.access_token}', data={ 'csrf_token': http.Request.csrf_token(self), 'res_id': 'bad' }, files={'ufile': BytesIO()}, allow_redirects=False, ) self.assertEqual(res.status_code, 400) self.assertIn('bad', res.text) # Upload a test file self.internal_folder.create_activity_option = True with (RecordCapturer(self.env['documents.document'], []) as capture, RecordCapturer(self.env['mail.activity'], [ ('res_model', '=', 'documents.document') ]) as capture_activity): res = self.url_open(f'/documents/upload/{self.internal_folder.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', b"Hello", 'text/plain')}, allow_redirects=False, ) res.raise_for_status() document = capture.records.ensure_one() self.assertEqual(document.name, 'hello.txt') self.assertEqual(document.mimetype, 'text/plain') self.assertEqual(document.owner_id, self.user_demo) self.assertEqual(document.res_id, document.id) self.assertEqual(document.res_model, 'documents.document') self.assertEqual(document.message_ids.mapped('body'), [ "Document uploaded by Marc Demo
", "Document created
", ]) activity = capture_activity.records.ensure_one() self.assertEqual(activity.activity_type_id, self.test_activity_type) self.assertEqual(activity.summary, 'test summary') self.assertEqual(activity.note, 'test note
') self.assertEqual(activity.user_id, self.user_admin) self.assertIn(activity.date_deadline, {fields.Date.today() + timedelta(days=5)}) # Upload a fake text file that actually is an image on the demo user self.internal_folder.create_activity_option = False with (RecordCapturer(self.env['documents.document'], []) as capture, RecordCapturer(self.env['mail.activity'], [ ('res_model', '=', 'documents.document') ]) as capture_activity): res = self.url_open(f'/documents/upload/{self.internal_folder.access_token}', data={ 'csrf_token': http.Request.csrf_token(self), 'res_id': self.user_demo.partner_id.id, 'res_model': 'res.partner', }, files={'ufile': ('hello.txt', self.doc_icon, 'text/plain')}, ) res.raise_for_status() self.assertFalse(capture_activity.records) document = capture.records.ensure_one() self.assertEqual(document.name, 'hello.txt', "the filename must not have been neutralized") self.assertEqual(document.mimetype, 'text/plain', "the mimetype must not have been neutralized") self.assertEqual(document.res_id, self.user_demo.partner_id.id) self.assertEqual(document.res_model, 'res.partner') def test_doc_upload_request_user(self): self.authenticate('demo', 'demo') # res_model/res_id should be ignored res = self.url_open(f'/documents/upload/{self.internal_request.access_token}', data={ 'res_id': self.user_demo.id, 'res_model': 'res.users', 'csrf_token': http.Request.csrf_token(self), }, files={'ufile': ('hello.txt', BytesIO(b"Hello"), 'text/plain')}, allow_redirects=False ) res.raise_for_status() self.assertEqual(self.internal_request.name, 'hello.txt') self.assertEqual(self.internal_request.mimetype, 'text/plain') self.assertEqual(self.internal_request.res_id, self.internal_request.id) self.assertEqual(self.internal_request.res_model, 'documents.document') self.assertEqual(self.internal_request.raw, b"Hello") self.assertEqual(self.internal_request.message_ids.mapped('body'), [ "Document uploaded by Marc Demo
", "Document created
", ]) # attempt to upload on the admin's file res = self.url_open(f'/documents/upload/{self.internal_hidden.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', BytesIO(b"Hello"), 'text/plain')}, ) self.assertEqual(res.status_code, 404) def test_doc_ctrl_upload_shortcut(self): self.authenticate(None, None) self.internal_file.action_update_access_rights(access_via_link='edit') for access_via_link, hidden in [ ('none', True), ('view', True), ('edit', True), ('none', False), ('view', False), ]: with self.subTest(access_via_link=access_via_link, hidden=hidden): self.internal_file.action_update_access_rights( access_via_link=access_via_link, is_access_via_link_hidden=hidden, ) res = self.url_open(f'/documents/upload/{self.public_shortcut.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', BytesIO(b"Hello"), 'text/plain')}, allow_redirects=False ) self.assertEqual(res.status_code, 404) self.internal_file.action_update_access_rights( access_via_link='edit', is_access_via_link_hidden=False, ) res = self.url_open(f'/documents/upload/{self.public_shortcut.access_token}', data={'csrf_token': http.Request.csrf_token(self)}, files={'ufile': ('hello.txt', BytesIO(b"Hello"), 'text/plain')}, allow_redirects=False ) res.raise_for_status() self.assertEqual(res.status_code, 303) self._assertPathIn(res.headers.get('Location'), { self.internal_file.access_url, self.public_shortcut.access_url, }) self.assertEqual(self.internal_file.access_via_link, 'edit') self.assertEqual(self.internal_file.name, 'hello.txt') self.assertEqual(self.internal_file.mimetype, 'text/plain') self.assertEqual(self.internal_file.res_id, self.internal_file.id) self.assertEqual(self.internal_file.res_model, 'documents.document') self.assertEqual(self.internal_file.raw, b"Hello") self.assertEqual(self.internal_file.message_ids.mapped('body'), [ "Document uploaded by Public user
", "Document created
", ]) self.url_open(res.headers['Location']).raise_for_status() def test_doc_ctrl_zip(self): # the internal user can access all access_internal=view files self.authenticate('demo', 'demo') res = self.url_open('/documents/zip?' + urlencode({ 'zip_name': 'file.zip', 'file_ids': f'{self.public_file.id},{self.internal_file.id}', })) res.raise_for_status() self.assertEqual(res.status_code, 200) self.assertEqual(res.headers.get('Content-Disposition'), "attachment; filename*=UTF-8''file.zip") with BytesIO(res.content) as resfile, zipfile.ZipFile(resfile) as reszip: self.assertEqual(sorted(reszip.namelist()), ['internal-file.png', 'public-file.png']) self.assertEqual(reszip.read('internal-file.png'), self.doc_icon) self.assertEqual(reszip.read('public-file.png'), self.doc_icon) # the portal user can only access files that he is not member of self.authenticate('portal_test', 'portal_test') with self.assertLogs('odoo.http', 'WARNING'): res = self.url_open('/documents/zip?' + urlencode({ 'zip_name': 'file.zip', 'file_ids': f'{self.public_file.id},{self.internal_file.id}', })) self.assertEqual(res.status_code, HTTPStatus.FORBIDDEN) res = self.url_open('/documents/zip?' + urlencode({ 'zip_name': 'file.zip', 'file_ids': f'{self.public_file.id}', })) res.raise_for_status() self.assertEqual(res.status_code, 200) self.assertEqual(res.headers.get('Content-Disposition'), "attachment; filename*=UTF-8''file.zip") with BytesIO(res.content) as resfile, zipfile.ZipFile(resfile) as reszip: self.assertEqual(reszip.namelist(), ['public-file.png']) self.assertEqual(reszip.read('public-file.png'), self.doc_icon) def test_web_ctrl_documents(self): public_url = f'/web/content/documents.document/{self.public_file.id}/raw' internal_url = f'/web/content/documents.document/{self.internal_file.id}/raw' with self.subTest(user=None): self.authenticate(None, None) res = self.url_open(public_url) self.assertEqual(res.status_code, 404) with self.subTest(user='portal_test'): self.authenticate('portal_test', 'portal_test') res = self.url_open(public_url) res.raise_for_status() self.assertEqual(res.status_code, 200) self.assertEqual(res.content, self.doc_icon) res = self.url_open(internal_url) self.assertEqual(res.status_code, 404) self.internal_file.access_ids.filtered( lambda access: access.partner_id == self.user_portal.partner_id ).role = 'view' res = self.url_open(internal_url) res.raise_for_status() self.assertEqual(res.status_code, 200) self.assertEqual(res.content, self.doc_icon) with self.subTest(user='demo'): self.authenticate('demo', 'demo') res = self.url_open(internal_url) res.raise_for_status() self.assertEqual(res.status_code, 200) self.assertEqual(res.content, self.doc_icon) def test_documents_get_init_data_folder_id(self): """Test computed side panel root depending on access rights.""" shared_portal_values = {'access_ids': [ Command.create({'partner_id': self.user_portal.partner_id.id, 'role': 'view'}) ]} shared_manager_values = {'access_ids': [ Command.create({'partner_id': self.user_manager.partner_id.id, 'role': 'edit'}) ]} internal_folder = self.env['documents.document'].create([ {'type': 'folder', 'name': 'Internal', 'access_internal': 'edit'}, ]) doc_as_demo = self.env['documents.document'].with_user(self.user_demo) restricted_folder = doc_as_demo.create({ 'folder_id': internal_folder.id, 'type': 'folder', 'name': 'Restricted Folder', 'access_internal': 'none', }) demo_personal, demo_company, company_portal, restricted_portal, restricted_manager, shared_via_link = ( doc_as_demo.create([ shared_portal_values | {'name': 'demo_personal_share_portal'}, shared_portal_values | {'name': 'demo_company_share_portal', 'access_internal': 'view'}, shared_portal_values | {'name': 'company_share_portal', 'folder_id': internal_folder.id}, shared_portal_values | {'name': 'restricted_share_portal', 'folder_id': restricted_folder.id}, shared_manager_values | {'name': 'restricted_manager', 'folder_id': restricted_folder.id}, {'name': 'restricted_shared_link', 'folder_id': restricted_folder.id, 'access_via_link': 'view'}, ]) ) for document, user, expected_folder_id in [ (demo_personal, self.user_demo, 'MY'), (demo_personal, self.user_portal, False), (demo_company, self.user_demo, 'MY'), (demo_company, self.user_portal, False), (demo_company, self.user_manager, 'COMPANY'), # as would any other internal user (company_portal, self.user_demo, internal_folder.id), (company_portal, self.user_portal, False), (company_portal, self.user_manager, internal_folder.id), (restricted_portal, self.user_demo, restricted_folder.id), (restricted_portal, self.user_portal, False), (restricted_manager, self.user_demo, restricted_folder.id), (restricted_manager, self.user_manager, 'SHARED'), (shared_via_link, self.user_portal, False), (shared_via_link, self.user_manager, 'SHARED'), ]: document.invalidate_recordset() with self.subTest(document_name=document.name, username=user.name): data = ShareRoute._documents_get_init_data(document.with_user(user), user) self.assertEqual(data['folder_id'], expected_folder_id)