diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/evennia/contrib/django/__init__.py b/evennia/contrib/django/__init__.py new file mode 100644 index 0000000000..02fe3ff90b --- /dev/null +++ b/evennia/contrib/django/__init__.py @@ -0,0 +1,3 @@ +""" +Intended to be a collecting folder for Django-specific contribs that do not have observable effects to players. +""" \ No newline at end of file diff --git a/evennia/contrib/aws_s3_cdn.py b/evennia/contrib/django/aws_s3_cdn.py similarity index 99% rename from evennia/contrib/aws_s3_cdn.py rename to evennia/contrib/django/aws_s3_cdn.py index d09943b3b5..048c1c4c8c 100644 --- a/evennia/contrib/aws_s3_cdn.py +++ b/evennia/contrib/django/aws_s3_cdn.py @@ -12,7 +12,7 @@ server may be sufficient for serving multimedia to a minimal number of users, the perfect use case for this plugin would be: 1) Servers supporting heavy web-based traffic (webclient, etc) -2) With a sizeable number of users +2) With a sizable number of users 3) Where the users are globally distributed 4) Where multimedia files are served to users as a part of gameplay @@ -131,7 +131,7 @@ AWS_S3_OBJECT_PARAMETERS = { 'Expires': 'Thu, 31 Dec 2099 20:00:00 GMT', AWS_DEFAULT_ACL = 'public-read' AWS_S3_CUSTOM_DOMAIN = '%s.s3.amazonaws.com' % settings.AWS_BUCKET_NAME AWS_AUTO_CREATE_BUCKET = True -STATICFILES_STORAGE = 'evennia.contrib.aws-s3-cdn.S3Boto3Storage' +STATICFILES_STORAGE = 'evennia.contrib.django.aws-s3-cdn.S3Boto3Storage' <<< END OF SECRET_SETTINGS.PY COPY/PASTE @@ -580,15 +580,14 @@ class S3Boto3Storage(Storage): default_content_type = "application/octet-stream" # If config provided in init, signature_version and addressing_style settings/args are ignored. config = None - # used for looking up the access and secret key from env vars access_key_names = ["AWS_S3_ACCESS_KEY_ID", "AWS_ACCESS_KEY_ID"] secret_key_names = ["AWS_S3_SECRET_ACCESS_KEY", "AWS_SECRET_ACCESS_KEY"] security_token_names = ["AWS_SESSION_TOKEN", "AWS_SECURITY_TOKEN"] security_token = None - access_key = setting("AWS_S3_ACCESS_KEY_ID", setting("AWS_ACCESS_KEY_ID")) - secret_key = setting("AWS_S3_SECRET_ACCESS_KEY", setting("AWS_SECRET_ACCESS_KEY")) + access_key = setting("AWS_S3_ACCESS_KEY_ID", setting("AWS_ACCESS_KEY_ID", "")) + secret_key = setting("AWS_S3_SECRET_ACCESS_KEY", setting("AWS_SECRET_ACCESS_KEY", "")) file_overwrite = setting("AWS_S3_FILE_OVERWRITE", True) object_parameters = setting("AWS_S3_OBJECT_PARAMETERS", {}) bucket_name = setting("AWS_STORAGE_BUCKET_NAME") diff --git a/evennia/contrib/django/tests.py b/evennia/contrib/django/tests.py new file mode 100644 index 0000000000..584005cf68 --- /dev/null +++ b/evennia/contrib/django/tests.py @@ -0,0 +1,635 @@ +from unittest import skipIf +from django.test import override_settings +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured +from django.core.files.base import ContentFile +from django.test import TestCase +from django.utils.timezone import is_aware, utc + +import datetime, gzip, pickle, threading + +from botocore.exceptions import ClientError +from evennia.contrib.django import aws_s3_cdn as s3boto3 + +try: + from django.utils.six.moves.urllib import parse as urlparse +except ImportError: + from urllib import parse as urlparse + + +try: + from unittest import mock +except ImportError: # Python 3.2 and below + import mock + + +class S3Boto3TestCase(TestCase): + def setUp(self): + self.storage = s3boto3.S3Boto3Storage(access_key="foo", secret_key="bar") + self.storage._connections.connection = mock.MagicMock() + + +class S3Boto3StorageTests(S3Boto3TestCase): + + def test_clean_name(self): + """ + Test the base case of _clean_name + """ + path = self.storage._clean_name("path/to/somewhere") + self.assertEqual(path, "path/to/somewhere") + + def test_clean_name_normalize(self): + """ + Test the normalization of _clean_name + """ + path = self.storage._clean_name("path/to/../somewhere") + self.assertEqual(path, "path/somewhere") + + def test_clean_name_trailing_slash(self): + """ + Test the _clean_name when the path has a trailing slash + """ + path = self.storage._clean_name("path/to/somewhere/") + self.assertEqual(path, "path/to/somewhere/") + + def test_clean_name_windows(self): + """ + Test the _clean_name when the path has a trailing slash + """ + path = self.storage._clean_name("path\\to\\somewhere") + self.assertEqual(path, "path/to/somewhere") + + def test_pickle_with_bucket(self): + """ + Test that the storage can be pickled with a bucket attached + """ + # Ensure the bucket has been used + self.storage.bucket + self.assertIsNotNone(self.storage._bucket) + + # Can't pickle MagicMock, but you can't pickle a real Bucket object either + p = pickle.dumps(self.storage) + new_storage = pickle.loads(p) + + self.assertIsInstance(new_storage._connections, threading.local) + # Put the mock connection back in + new_storage._connections.connection = mock.MagicMock() + + self.assertIsNone(new_storage._bucket) + new_storage.bucket + self.assertIsNotNone(new_storage._bucket) + + def test_pickle_without_bucket(self): + """ + Test that the storage can be pickled, without a bucket instance + """ + + # Can't pickle a threadlocal + p = pickle.dumps(self.storage) + new_storage = pickle.loads(p) + + self.assertIsInstance(new_storage._connections, threading.local) + + def test_storage_url_slashes(self): + """ + Test URL generation. + """ + self.storage.custom_domain = 'example.com' + + # We expect no leading slashes in the path, + # and trailing slashes should be preserved. + self.assertEqual(self.storage.url(''), 'https://example.com/') + self.assertEqual(self.storage.url('path'), 'https://example.com/path') + self.assertEqual(self.storage.url('path/'), 'https://example.com/path/') + self.assertEqual(self.storage.url('path/1'), 'https://example.com/path/1') + self.assertEqual(self.storage.url('path/1/'), 'https://example.com/path/1/') + + def test_storage_save(self): + """ + Test saving a file + """ + name = 'test_storage_save.txt' + content = ContentFile('new content') + self.storage.save(name, content) + self.storage.bucket.Object.assert_called_once_with(name) + + obj = self.storage.bucket.Object.return_value + obj.upload_fileobj.assert_called_with( + content, + ExtraArgs={ + 'ContentType': 'text/plain', + 'ACL': self.storage.default_acl, + } + ) + + def test_storage_save_with_acl(self): + """ + Test saving a file with user defined ACL. + """ + name = 'test_storage_save.txt' + content = ContentFile('new content') + self.storage.default_acl = 'private' + self.storage.save(name, content) + self.storage.bucket.Object.assert_called_once_with(name) + + obj = self.storage.bucket.Object.return_value + obj.upload_fileobj.assert_called_with( + content, + ExtraArgs={ + 'ContentType': 'text/plain', + 'ACL': 'private', + } + ) + + def test_content_type(self): + """ + Test saving a file with a None content type. + """ + name = 'test_image.jpg' + content = ContentFile('data') + content.content_type = None + self.storage.save(name, content) + self.storage.bucket.Object.assert_called_once_with(name) + + obj = self.storage.bucket.Object.return_value + obj.upload_fileobj.assert_called_with( + content, + ExtraArgs={ + 'ContentType': 'image/jpeg', + 'ACL': self.storage.default_acl, + } + ) + + def test_storage_save_gzipped(self): + """ + Test saving a gzipped file + """ + name = 'test_storage_save.gz' + content = ContentFile("I am gzip'd") + self.storage.save(name, content) + obj = self.storage.bucket.Object.return_value + obj.upload_fileobj.assert_called_with( + content, + ExtraArgs={ + 'ContentType': 'application/octet-stream', + 'ContentEncoding': 'gzip', + 'ACL': self.storage.default_acl, + } + ) + + def test_storage_save_gzip(self): + """ + Test saving a file with gzip enabled. + """ + self.storage.gzip = True + name = 'test_storage_save.css' + content = ContentFile("I should be gzip'd") + self.storage.save(name, content) + obj = self.storage.bucket.Object.return_value + obj.upload_fileobj.assert_called_with( + mock.ANY, + ExtraArgs={ + 'ContentType': 'text/css', + 'ContentEncoding': 'gzip', + 'ACL': self.storage.default_acl, + } + ) + args, kwargs = obj.upload_fileobj.call_args + content = args[0] + zfile = gzip.GzipFile(mode='rb', fileobj=content) + self.assertEqual(zfile.read(), b"I should be gzip'd") + + def test_storage_save_gzip_twice(self): + """ + Test saving the same file content twice with gzip enabled. + """ + # Given + self.storage.gzip = True + name = 'test_storage_save.css' + content = ContentFile("I should be gzip'd") + + # When + self.storage.save(name, content) + self.storage.save('test_storage_save_2.css', content) + + # Then + obj = self.storage.bucket.Object.return_value + obj.upload_fileobj.assert_called_with( + mock.ANY, + ExtraArgs={ + 'ContentType': 'text/css', + 'ContentEncoding': 'gzip', + 'ACL': self.storage.default_acl, + } + ) + args, kwargs = obj.upload_fileobj.call_args + content = args[0] + zfile = gzip.GzipFile(mode='rb', fileobj=content) + self.assertEqual(zfile.read(), b"I should be gzip'd") + + def test_compress_content_len(self): + """ + Test that file returned by _compress_content() is readable. + """ + self.storage.gzip = True + content = ContentFile("I should be gzip'd") + content = self.storage._compress_content(content) + self.assertTrue(len(content.read()) > 0) + + def test_storage_open_write(self): + """ + Test opening a file in write mode + """ + name = 'test_open_for_writïng.txt' + content = 'new content' + + # Set the encryption flag used for multipart uploads + self.storage.encryption = True + self.storage.reduced_redundancy = True + self.storage.default_acl = 'public-read' + + file = self.storage.open(name, 'w') + self.storage.bucket.Object.assert_called_with(name) + obj = self.storage.bucket.Object.return_value + # Set the name of the mock object + obj.key = name + + file.write(content) + obj.initiate_multipart_upload.assert_called_with( + ACL='public-read', + ContentType='text/plain', + ServerSideEncryption='AES256', + StorageClass='REDUCED_REDUNDANCY' + ) + + # Save the internal file before closing + multipart = obj.initiate_multipart_upload.return_value + multipart.parts.all.return_value = [mock.MagicMock(e_tag='123', part_number=1)] + file.close() + multipart.Part.assert_called_with(1) + part = multipart.Part.return_value + part.upload.assert_called_with(Body=content.encode('utf-8')) + multipart.complete.assert_called_once_with( + MultipartUpload={'Parts': [{'ETag': '123', 'PartNumber': 1}]}) + + def test_storage_open_no_write(self): + """ + Test opening file in write mode and closing without writing. + + A file should be created as by obj.put(...). + """ + name = 'test_open_no_write.txt' + + # Set the encryption flag used for puts + self.storage.encryption = True + self.storage.reduced_redundancy = True + self.storage.default_acl = 'public-read' + + file = self.storage.open(name, 'w') + self.storage.bucket.Object.assert_called_with(name) + obj = self.storage.bucket.Object.return_value + obj.load.side_effect = ClientError({'Error': {}, + 'ResponseMetadata': {'HTTPStatusCode': 404}}, + 'head_bucket') + + # Set the name of the mock object + obj.key = name + + # Save the internal file before closing + file.close() + + obj.load.assert_called_once_with() + obj.put.assert_called_once_with( + ACL='public-read', + Body=b"", + ContentType='text/plain', + ServerSideEncryption='AES256', + StorageClass='REDUCED_REDUNDANCY' + ) + + def test_storage_open_no_overwrite_existing(self): + """ + Test opening an existing file in write mode and closing without writing. + """ + name = 'test_open_no_overwrite_existing.txt' + + # Set the encryption flag used for puts + self.storage.encryption = True + self.storage.reduced_redundancy = True + self.storage.default_acl = 'public-read' + + file = self.storage.open(name, 'w') + self.storage.bucket.Object.assert_called_with(name) + obj = self.storage.bucket.Object.return_value + + # Set the name of the mock object + obj.key = name + + # Save the internal file before closing + file.close() + + obj.load.assert_called_once_with() + obj.put.assert_not_called() + + def test_storage_write_beyond_buffer_size(self): + """ + Test writing content that exceeds the buffer size + """ + name = 'test_open_for_writïng_beyond_buffer_size.txt' + + # Set the encryption flag used for multipart uploads + self.storage.encryption = True + self.storage.reduced_redundancy = True + self.storage.default_acl = 'public-read' + + file = self.storage.open(name, 'w') + self.storage.bucket.Object.assert_called_with(name) + obj = self.storage.bucket.Object.return_value + # Set the name of the mock object + obj.key = name + + # Initiate the multipart upload + file.write('') + obj.initiate_multipart_upload.assert_called_with( + ACL='public-read', + ContentType='text/plain', + ServerSideEncryption='AES256', + StorageClass='REDUCED_REDUNDANCY' + ) + multipart = obj.initiate_multipart_upload.return_value + + # Write content at least twice as long as the buffer size + written_content = '' + counter = 1 + while len(written_content) < 2 * file.buffer_size: + content = 'hello, aws {counter}\n'.format(counter=counter) + # Write more than just a few bytes in each iteration to keep the + # test reasonably fast + content += '*' * int(file.buffer_size / 10) + file.write(content) + written_content += content + counter += 1 + + # Save the internal file before closing + multipart.parts.all.return_value = [ + mock.MagicMock(e_tag='123', part_number=1), + mock.MagicMock(e_tag='456', part_number=2) + ] + file.close() + self.assertListEqual( + multipart.Part.call_args_list, + [mock.call(1), mock.call(2)] + ) + part = multipart.Part.return_value + uploaded_content = ''.join( + args_list[1]['Body'].decode('utf-8') + for args_list in part.upload.call_args_list + ) + self.assertEqual(uploaded_content, written_content) + multipart.complete.assert_called_once_with( + MultipartUpload={'Parts': [ + {'ETag': '123', 'PartNumber': 1}, + {'ETag': '456', 'PartNumber': 2}, + ]} + ) + + def test_auto_creating_bucket(self): + self.storage.auto_create_bucket = True + Bucket = mock.MagicMock() + self.storage._connections.connection.Bucket.return_value = Bucket + self.storage._connections.connection.meta.client.meta.region_name = 'sa-east-1' + + Bucket.meta.client.head_bucket.side_effect = ClientError({'Error': {}, + 'ResponseMetadata': {'HTTPStatusCode': 404}}, + 'head_bucket') + self.storage._get_or_create_bucket('testbucketname') + Bucket.create.assert_called_once_with( + ACL='public-read', + CreateBucketConfiguration={ + 'LocationConstraint': 'sa-east-1', + } + ) + + def test_auto_creating_bucket_with_acl(self): + self.storage.auto_create_bucket = True + self.storage.bucket_acl = 'public-read' + Bucket = mock.MagicMock() + self.storage._connections.connection.Bucket.return_value = Bucket + self.storage._connections.connection.meta.client.meta.region_name = 'sa-east-1' + + Bucket.meta.client.head_bucket.side_effect = ClientError({'Error': {}, + 'ResponseMetadata': {'HTTPStatusCode': 404}}, + 'head_bucket') + self.storage._get_or_create_bucket('testbucketname') + Bucket.create.assert_called_once_with( + ACL='public-read', + CreateBucketConfiguration={ + 'LocationConstraint': 'sa-east-1', + } + ) + + def test_storage_exists(self): + self.assertTrue(self.storage.exists("file.txt")) + self.storage.connection.meta.client.head_object.assert_called_with( + Bucket=self.storage.bucket_name, + Key="file.txt", + ) + + def test_storage_exists_false(self): + self.storage.connection.meta.client.head_object.side_effect = ClientError( + {'Error': {'Code': '404', 'Message': 'Not Found'}}, + 'HeadObject', + ) + self.assertFalse(self.storage.exists("file.txt")) + self.storage.connection.meta.client.head_object.assert_called_with( + Bucket=self.storage.bucket_name, + Key='file.txt', + ) + + def test_storage_exists_doesnt_create_bucket(self): + with mock.patch.object(self.storage, '_get_or_create_bucket') as method: + self.storage.exists('file.txt') + self.assertFalse(method.called) + + def test_storage_delete(self): + self.storage.delete("path/to/file.txt") + self.storage.bucket.Object.assert_called_with('path/to/file.txt') + self.storage.bucket.Object.return_value.delete.assert_called_with() + + def test_storage_listdir_base(self): + # Files: + # some/path/1.txt + # 2.txt + # other/path/3.txt + # 4.txt + pages = [ + { + 'CommonPrefixes': [ + {'Prefix': 'some'}, + {'Prefix': 'other'}, + ], + 'Contents': [ + {'Key': '2.txt'}, + {'Key': '4.txt'}, + ], + }, + ] + + paginator = mock.MagicMock() + paginator.paginate.return_value = pages + self.storage._connections.connection.meta.client.get_paginator.return_value = paginator + + dirs, files = self.storage.listdir('') + paginator.paginate.assert_called_with(Bucket=None, Delimiter='/', Prefix='') + + self.assertEqual(dirs, ['some', 'other']) + self.assertEqual(files, ['2.txt', '4.txt']) + + def test_storage_listdir_subdir(self): + # Files: + # some/path/1.txt + # some/2.txt + pages = [ + { + 'CommonPrefixes': [ + {'Prefix': 'some/path'}, + ], + 'Contents': [ + {'Key': 'some/2.txt'}, + ], + }, + ] + + paginator = mock.MagicMock() + paginator.paginate.return_value = pages + self.storage._connections.connection.meta.client.get_paginator.return_value = paginator + + dirs, files = self.storage.listdir('some/') + paginator.paginate.assert_called_with(Bucket=None, Delimiter='/', Prefix='some/') + + self.assertEqual(dirs, ['path']) + self.assertEqual(files, ['2.txt']) + + def test_storage_size(self): + obj = self.storage.bucket.Object.return_value + obj.content_length = 4098 + + name = 'file.txt' + self.assertEqual(self.storage.size(name), obj.content_length) + + def test_storage_mtime(self): + # Test both USE_TZ cases + for use_tz in (True, False): + with self.settings(USE_TZ=use_tz): + self._test_storage_mtime(use_tz) + + def _test_storage_mtime(self, use_tz): + obj = self.storage.bucket.Object.return_value + obj.last_modified = datetime.datetime.now(utc) + + name = 'file.txt' + self.assertFalse( + is_aware(self.storage.modified_time(name)), + 'Naive datetime object expected from modified_time()' + ) + + self.assertIs( + settings.USE_TZ, + is_aware(self.storage.get_modified_time(name)), + '{} datetime object expected from get_modified_time() when USE_TZ={}'.format( + ('Naive', 'Aware')[settings.USE_TZ], + settings.USE_TZ + ) + ) + + def test_storage_url(self): + name = 'test_storage_size.txt' + url = 'http://aws.amazon.com/%s' % name + self.storage.bucket.meta.client.generate_presigned_url.return_value = url + self.storage.bucket.name = 'bucket' + self.assertEqual(self.storage.url(name), url) + self.storage.bucket.meta.client.generate_presigned_url.assert_called_with( + 'get_object', + Params={'Bucket': self.storage.bucket.name, 'Key': name}, + ExpiresIn=self.storage.querystring_expire, + ) + + custom_expire = 123 + + self.assertEqual(self.storage.url(name, expire=custom_expire), url) + self.storage.bucket.meta.client.generate_presigned_url.assert_called_with( + 'get_object', + Params={'Bucket': self.storage.bucket.name, 'Key': name}, + ExpiresIn=custom_expire, + ) + + + def test_generated_url_is_encoded(self): + self.storage.custom_domain = "mock.cloudfront.net" + filename = "whacky & filename.mp4" + url = self.storage.url(filename) + parsed_url = urlparse.urlparse(url) + self.assertEqual(parsed_url.path, + "/whacky%20%26%20filename.mp4") + self.assertFalse(self.storage.bucket.meta.client.generate_presigned_url.called) + + def test_special_characters(self): + self.storage.custom_domain = "mock.cloudfront.net" + + name = "ãlöhâ.jpg" + content = ContentFile('new content') + self.storage.save(name, content) + self.storage.bucket.Object.assert_called_once_with(name) + + url = self.storage.url(name) + parsed_url = urlparse.urlparse(url) + self.assertEqual(parsed_url.path, "/%C3%A3l%C3%B6h%C3%A2.jpg") + + def test_strip_signing_parameters(self): + expected = 'http://bucket.s3-aws-region.amazonaws.com/foo/bar' + self.assertEqual(self.storage._strip_signing_parameters( + '%s?X-Amz-Date=12345678&X-Amz-Signature=Signature' % expected), expected) + self.assertEqual(self.storage._strip_signing_parameters( + '%s?expires=12345678&signature=Signature' % expected), expected) + + @skipIf(threading is None, 'Test requires threading') + def test_connection_threading(self): + connections = [] + + def thread_storage_connection(): + connections.append(self.storage.connection) + + for x in range(2): + t = threading.Thread(target=thread_storage_connection) + t.start() + t.join() + + # Connection for each thread needs to be unique + self.assertIsNot(connections[0], connections[1]) + + def test_location_leading_slash(self): + msg = ( + "S3Boto3Storage.location cannot begin with a leading slash. " + "Found '/'. Use '' instead." + ) + with self.assertRaises(ImproperlyConfigured, msg=msg): + s3boto3.S3Boto3Storage(location='/') + + def test_override_class_variable(self): + class MyStorage1(s3boto3.S3Boto3Storage): + location = 'foo1' + + storage = MyStorage1() + self.assertEqual(storage.location, 'foo1') + + class MyStorage2(s3boto3.S3Boto3Storage): + location = 'foo2' + + storage = MyStorage2() + self.assertEqual(storage.location, 'foo2') + + def test_override_init_argument(self): + storage = s3boto3.S3Boto3Storage(location='foo1') + self.assertEqual(storage.location, 'foo1') + storage = s3boto3.S3Boto3Storage(location='foo2') + self.assertEqual(storage.location, 'foo2') diff --git a/evennia/contrib/tests.py b/evennia/contrib/tests.py index e7932ac6ee..2bd3158a11 100644 --- a/evennia/contrib/tests.py +++ b/evennia/contrib/tests.py @@ -4,7 +4,6 @@ Testing suite for contrib folder """ -import sys import datetime from django.test import override_settings from evennia.commands.default.tests import CommandTest @@ -237,7 +236,6 @@ class TestRPSystem(EvenniaTest): from django.conf import settings from evennia.contrib import extended_room -from evennia import gametime from evennia.objects.objects import DefaultRoom @@ -2128,7 +2126,6 @@ class TestRandomStringGenerator(EvenniaTest): import itertools from evennia.contrib import puzzles from evennia.utils import search -from evennia.utils.utils import inherits_from class TestPuzzles(CommandTest): @@ -3093,7 +3090,7 @@ class TestPuzzles(CommandTest): # Tests for the building_menu contrib -from evennia.contrib.building_menu import BuildingMenu, CmdNoInput, CmdNoMatch +from evennia.contrib.building_menu import BuildingMenu, CmdNoMatch class Submenu(BuildingMenu): @@ -3265,719 +3262,3 @@ class TestBuildingMenu(CommandTest): self.assertEqual(self.char1.ndb._building_menu.obj, self.room1) self.call(CmdNoMatch(building_menu=self.menu), "q") self.assertEqual(self.exit.key, "in") - -import gzip -import pickle -import threading -import warnings -from datetime import datetime -from unittest import skipIf - -from botocore.exceptions import ClientError -from django.core.exceptions import ImproperlyConfigured -from django.core.files.base import ContentFile -from django.test import TestCase -from django.utils.timezone import is_aware, utc - -from evennia.contrib import aws_s3_cdn as s3boto3 - -try: - from django.utils.six.moves.urllib import parse as urlparse -except ImportError: - from urllib import parse as urlparse - - -try: - from unittest import mock -except ImportError: # Python 3.2 and below - import mock - - -class S3Boto3TestCase(TestCase): - def setUp(self): - self.storage = s3boto3.S3Boto3Storage() - self.storage._connections.connection = mock.MagicMock() - - -class S3Boto3StorageTests(S3Boto3TestCase): - - def test_clean_name(self): - """ - Test the base case of _clean_name - """ - path = self.storage._clean_name("path/to/somewhere") - self.assertEqual(path, "path/to/somewhere") - - def test_clean_name_normalize(self): - """ - Test the normalization of _clean_name - """ - path = self.storage._clean_name("path/to/../somewhere") - self.assertEqual(path, "path/somewhere") - - def test_clean_name_trailing_slash(self): - """ - Test the _clean_name when the path has a trailing slash - """ - path = self.storage._clean_name("path/to/somewhere/") - self.assertEqual(path, "path/to/somewhere/") - - def test_clean_name_windows(self): - """ - Test the _clean_name when the path has a trailing slash - """ - path = self.storage._clean_name("path\\to\\somewhere") - self.assertEqual(path, "path/to/somewhere") - - def test_pickle_with_bucket(self): - """ - Test that the storage can be pickled with a bucket attached - """ - # Ensure the bucket has been used - self.storage.bucket - self.assertIsNotNone(self.storage._bucket) - - # Can't pickle MagicMock, but you can't pickle a real Bucket object either - p = pickle.dumps(self.storage) - new_storage = pickle.loads(p) - - self.assertIsInstance(new_storage._connections, threading.local) - # Put the mock connection back in - new_storage._connections.connection = mock.MagicMock() - - self.assertIsNone(new_storage._bucket) - new_storage.bucket - self.assertIsNotNone(new_storage._bucket) - - def test_pickle_without_bucket(self): - """ - Test that the storage can be pickled, without a bucket instance - """ - - # Can't pickle a threadlocal - p = pickle.dumps(self.storage) - new_storage = pickle.loads(p) - - self.assertIsInstance(new_storage._connections, threading.local) - - def test_storage_url_slashes(self): - """ - Test URL generation. - """ - self.storage.custom_domain = 'example.com' - - # We expect no leading slashes in the path, - # and trailing slashes should be preserved. - self.assertEqual(self.storage.url(''), 'https://example.com/') - self.assertEqual(self.storage.url('path'), 'https://example.com/path') - self.assertEqual(self.storage.url('path/'), 'https://example.com/path/') - self.assertEqual(self.storage.url('path/1'), 'https://example.com/path/1') - self.assertEqual(self.storage.url('path/1/'), 'https://example.com/path/1/') - - def test_storage_save(self): - """ - Test saving a file - """ - name = 'test_storage_save.txt' - content = ContentFile('new content') - self.storage.save(name, content) - self.storage.bucket.Object.assert_called_once_with(name) - - obj = self.storage.bucket.Object.return_value - obj.upload_fileobj.assert_called_with( - content, - ExtraArgs={ - 'ContentType': 'text/plain', - 'ACL': self.storage.default_acl, - } - ) - - def test_storage_save_with_acl(self): - """ - Test saving a file with user defined ACL. - """ - name = 'test_storage_save.txt' - content = ContentFile('new content') - self.storage.default_acl = 'private' - self.storage.save(name, content) - self.storage.bucket.Object.assert_called_once_with(name) - - obj = self.storage.bucket.Object.return_value - obj.upload_fileobj.assert_called_with( - content, - ExtraArgs={ - 'ContentType': 'text/plain', - 'ACL': 'private', - } - ) - - def test_content_type(self): - """ - Test saving a file with a None content type. - """ - name = 'test_image.jpg' - content = ContentFile('data') - content.content_type = None - self.storage.save(name, content) - self.storage.bucket.Object.assert_called_once_with(name) - - obj = self.storage.bucket.Object.return_value - obj.upload_fileobj.assert_called_with( - content, - ExtraArgs={ - 'ContentType': 'image/jpeg', - 'ACL': self.storage.default_acl, - } - ) - - def test_storage_save_gzipped(self): - """ - Test saving a gzipped file - """ - name = 'test_storage_save.gz' - content = ContentFile("I am gzip'd") - self.storage.save(name, content) - obj = self.storage.bucket.Object.return_value - obj.upload_fileobj.assert_called_with( - content, - ExtraArgs={ - 'ContentType': 'application/octet-stream', - 'ContentEncoding': 'gzip', - 'ACL': self.storage.default_acl, - } - ) - - def test_storage_save_gzip(self): - """ - Test saving a file with gzip enabled. - """ - self.storage.gzip = True - name = 'test_storage_save.css' - content = ContentFile("I should be gzip'd") - self.storage.save(name, content) - obj = self.storage.bucket.Object.return_value - obj.upload_fileobj.assert_called_with( - mock.ANY, - ExtraArgs={ - 'ContentType': 'text/css', - 'ContentEncoding': 'gzip', - 'ACL': self.storage.default_acl, - } - ) - args, kwargs = obj.upload_fileobj.call_args - content = args[0] - zfile = gzip.GzipFile(mode='rb', fileobj=content) - self.assertEqual(zfile.read(), b"I should be gzip'd") - - def test_storage_save_gzip_twice(self): - """ - Test saving the same file content twice with gzip enabled. - """ - # Given - self.storage.gzip = True - name = 'test_storage_save.css' - content = ContentFile("I should be gzip'd") - - # When - self.storage.save(name, content) - self.storage.save('test_storage_save_2.css', content) - - # Then - obj = self.storage.bucket.Object.return_value - obj.upload_fileobj.assert_called_with( - mock.ANY, - ExtraArgs={ - 'ContentType': 'text/css', - 'ContentEncoding': 'gzip', - 'ACL': self.storage.default_acl, - } - ) - args, kwargs = obj.upload_fileobj.call_args - content = args[0] - zfile = gzip.GzipFile(mode='rb', fileobj=content) - self.assertEqual(zfile.read(), b"I should be gzip'd") - - def test_compress_content_len(self): - """ - Test that file returned by _compress_content() is readable. - """ - self.storage.gzip = True - content = ContentFile("I should be gzip'd") - content = self.storage._compress_content(content) - self.assertTrue(len(content.read()) > 0) - - def test_storage_open_write(self): - """ - Test opening a file in write mode - """ - name = 'test_open_for_writïng.txt' - content = 'new content' - - # Set the encryption flag used for multipart uploads - self.storage.encryption = True - self.storage.reduced_redundancy = True - self.storage.default_acl = 'public-read' - - file = self.storage.open(name, 'w') - self.storage.bucket.Object.assert_called_with(name) - obj = self.storage.bucket.Object.return_value - # Set the name of the mock object - obj.key = name - - file.write(content) - obj.initiate_multipart_upload.assert_called_with( - ACL='public-read', - ContentType='text/plain', - ServerSideEncryption='AES256', - StorageClass='REDUCED_REDUNDANCY' - ) - - # Save the internal file before closing - multipart = obj.initiate_multipart_upload.return_value - multipart.parts.all.return_value = [mock.MagicMock(e_tag='123', part_number=1)] - file.close() - multipart.Part.assert_called_with(1) - part = multipart.Part.return_value - part.upload.assert_called_with(Body=content.encode('utf-8')) - multipart.complete.assert_called_once_with( - MultipartUpload={'Parts': [{'ETag': '123', 'PartNumber': 1}]}) - - def test_storage_open_no_write(self): - """ - Test opening file in write mode and closing without writing. - - A file should be created as by obj.put(...). - """ - name = 'test_open_no_write.txt' - - # Set the encryption flag used for puts - self.storage.encryption = True - self.storage.reduced_redundancy = True - self.storage.default_acl = 'public-read' - - file = self.storage.open(name, 'w') - self.storage.bucket.Object.assert_called_with(name) - obj = self.storage.bucket.Object.return_value - obj.load.side_effect = ClientError({'Error': {}, - 'ResponseMetadata': {'HTTPStatusCode': 404}}, - 'head_bucket') - - # Set the name of the mock object - obj.key = name - - # Save the internal file before closing - file.close() - - obj.load.assert_called_once_with() - obj.put.assert_called_once_with( - ACL='public-read', - Body=b"", - ContentType='text/plain', - ServerSideEncryption='AES256', - StorageClass='REDUCED_REDUNDANCY' - ) - - def test_storage_open_no_overwrite_existing(self): - """ - Test opening an existing file in write mode and closing without writing. - """ - name = 'test_open_no_overwrite_existing.txt' - - # Set the encryption flag used for puts - self.storage.encryption = True - self.storage.reduced_redundancy = True - self.storage.default_acl = 'public-read' - - file = self.storage.open(name, 'w') - self.storage.bucket.Object.assert_called_with(name) - obj = self.storage.bucket.Object.return_value - - # Set the name of the mock object - obj.key = name - - # Save the internal file before closing - file.close() - - obj.load.assert_called_once_with() - obj.put.assert_not_called() - - def test_storage_write_beyond_buffer_size(self): - """ - Test writing content that exceeds the buffer size - """ - name = 'test_open_for_writïng_beyond_buffer_size.txt' - - # Set the encryption flag used for multipart uploads - self.storage.encryption = True - self.storage.reduced_redundancy = True - self.storage.default_acl = 'public-read' - - file = self.storage.open(name, 'w') - self.storage.bucket.Object.assert_called_with(name) - obj = self.storage.bucket.Object.return_value - # Set the name of the mock object - obj.key = name - - # Initiate the multipart upload - file.write('') - obj.initiate_multipart_upload.assert_called_with( - ACL='public-read', - ContentType='text/plain', - ServerSideEncryption='AES256', - StorageClass='REDUCED_REDUNDANCY' - ) - multipart = obj.initiate_multipart_upload.return_value - - # Write content at least twice as long as the buffer size - written_content = '' - counter = 1 - while len(written_content) < 2 * file.buffer_size: - content = 'hello, aws {counter}\n'.format(counter=counter) - # Write more than just a few bytes in each iteration to keep the - # test reasonably fast - content += '*' * int(file.buffer_size / 10) - file.write(content) - written_content += content - counter += 1 - - # Save the internal file before closing - multipart.parts.all.return_value = [ - mock.MagicMock(e_tag='123', part_number=1), - mock.MagicMock(e_tag='456', part_number=2) - ] - file.close() - self.assertListEqual( - multipart.Part.call_args_list, - [mock.call(1), mock.call(2)] - ) - part = multipart.Part.return_value - uploaded_content = ''.join( - args_list[1]['Body'].decode('utf-8') - for args_list in part.upload.call_args_list - ) - self.assertEqual(uploaded_content, written_content) - multipart.complete.assert_called_once_with( - MultipartUpload={'Parts': [ - {'ETag': '123', 'PartNumber': 1}, - {'ETag': '456', 'PartNumber': 2}, - ]} - ) - - def test_auto_creating_bucket(self): - self.storage.auto_create_bucket = True - Bucket = mock.MagicMock() - self.storage._connections.connection.Bucket.return_value = Bucket - self.storage._connections.connection.meta.client.meta.region_name = 'sa-east-1' - - Bucket.meta.client.head_bucket.side_effect = ClientError({'Error': {}, - 'ResponseMetadata': {'HTTPStatusCode': 404}}, - 'head_bucket') - self.storage._get_or_create_bucket('testbucketname') - Bucket.create.assert_called_once_with( - ACL='public-read', - CreateBucketConfiguration={ - 'LocationConstraint': 'sa-east-1', - } - ) - - def test_auto_creating_bucket_with_acl(self): - self.storage.auto_create_bucket = True - self.storage.bucket_acl = 'public-read' - Bucket = mock.MagicMock() - self.storage._connections.connection.Bucket.return_value = Bucket - self.storage._connections.connection.meta.client.meta.region_name = 'sa-east-1' - - Bucket.meta.client.head_bucket.side_effect = ClientError({'Error': {}, - 'ResponseMetadata': {'HTTPStatusCode': 404}}, - 'head_bucket') - self.storage._get_or_create_bucket('testbucketname') - Bucket.create.assert_called_once_with( - ACL='public-read', - CreateBucketConfiguration={ - 'LocationConstraint': 'sa-east-1', - } - ) - - def test_storage_exists(self): - self.assertTrue(self.storage.exists("file.txt")) - self.storage.connection.meta.client.head_object.assert_called_with( - Bucket=self.storage.bucket_name, - Key="file.txt", - ) - - def test_storage_exists_false(self): - self.storage.connection.meta.client.head_object.side_effect = ClientError( - {'Error': {'Code': '404', 'Message': 'Not Found'}}, - 'HeadObject', - ) - self.assertFalse(self.storage.exists("file.txt")) - self.storage.connection.meta.client.head_object.assert_called_with( - Bucket=self.storage.bucket_name, - Key='file.txt', - ) - - def test_storage_exists_doesnt_create_bucket(self): - with mock.patch.object(self.storage, '_get_or_create_bucket') as method: - self.storage.exists('file.txt') - self.assertFalse(method.called) - - def test_storage_delete(self): - self.storage.delete("path/to/file.txt") - self.storage.bucket.Object.assert_called_with('path/to/file.txt') - self.storage.bucket.Object.return_value.delete.assert_called_with() - - def test_storage_listdir_base(self): - # Files: - # some/path/1.txt - # 2.txt - # other/path/3.txt - # 4.txt - pages = [ - { - 'CommonPrefixes': [ - {'Prefix': 'some'}, - {'Prefix': 'other'}, - ], - 'Contents': [ - {'Key': '2.txt'}, - {'Key': '4.txt'}, - ], - }, - ] - - paginator = mock.MagicMock() - paginator.paginate.return_value = pages - self.storage._connections.connection.meta.client.get_paginator.return_value = paginator - - dirs, files = self.storage.listdir('') - paginator.paginate.assert_called_with(Bucket=None, Delimiter='/', Prefix='') - - self.assertEqual(dirs, ['some', 'other']) - self.assertEqual(files, ['2.txt', '4.txt']) - - def test_storage_listdir_subdir(self): - # Files: - # some/path/1.txt - # some/2.txt - pages = [ - { - 'CommonPrefixes': [ - {'Prefix': 'some/path'}, - ], - 'Contents': [ - {'Key': 'some/2.txt'}, - ], - }, - ] - - paginator = mock.MagicMock() - paginator.paginate.return_value = pages - self.storage._connections.connection.meta.client.get_paginator.return_value = paginator - - dirs, files = self.storage.listdir('some/') - paginator.paginate.assert_called_with(Bucket=None, Delimiter='/', Prefix='some/') - - self.assertEqual(dirs, ['path']) - self.assertEqual(files, ['2.txt']) - - def test_storage_size(self): - obj = self.storage.bucket.Object.return_value - obj.content_length = 4098 - - name = 'file.txt' - self.assertEqual(self.storage.size(name), obj.content_length) - - def test_storage_mtime(self): - # Test both USE_TZ cases - for use_tz in (True, False): - with self.settings(USE_TZ=use_tz): - self._test_storage_mtime(use_tz) - - def _test_storage_mtime(self, use_tz): - obj = self.storage.bucket.Object.return_value - obj.last_modified = datetime.now(utc) - - name = 'file.txt' - self.assertFalse( - is_aware(self.storage.modified_time(name)), - 'Naive datetime object expected from modified_time()' - ) - - self.assertIs( - settings.USE_TZ, - is_aware(self.storage.get_modified_time(name)), - '{} datetime object expected from get_modified_time() when USE_TZ={}'.format( - ('Naive', 'Aware')[settings.USE_TZ], - settings.USE_TZ - ) - ) - - def test_storage_url(self): - name = 'test_storage_size.txt' - url = 'http://aws.amazon.com/%s' % name - self.storage.bucket.meta.client.generate_presigned_url.return_value = url - self.storage.bucket.name = 'bucket' - self.assertEqual(self.storage.url(name), url) - self.storage.bucket.meta.client.generate_presigned_url.assert_called_with( - 'get_object', - Params={'Bucket': self.storage.bucket.name, 'Key': name}, - ExpiresIn=self.storage.querystring_expire, - HttpMethod=None, - ) - - custom_expire = 123 - - self.assertEqual(self.storage.url(name, expire=custom_expire), url) - self.storage.bucket.meta.client.generate_presigned_url.assert_called_with( - 'get_object', - Params={'Bucket': self.storage.bucket.name, 'Key': name}, - ExpiresIn=custom_expire, - HttpMethod=None, - ) - - custom_method = 'HEAD' - - self.assertEqual(self.storage.url(name, http_method=custom_method), url) - self.storage.bucket.meta.client.generate_presigned_url.assert_called_with( - 'get_object', - Params={'Bucket': self.storage.bucket.name, 'Key': name}, - ExpiresIn=self.storage.querystring_expire, - HttpMethod=custom_method, - ) - - def test_generated_url_is_encoded(self): - self.storage.custom_domain = "mock.cloudfront.net" - filename = "whacky & filename.mp4" - url = self.storage.url(filename) - parsed_url = urlparse.urlparse(url) - self.assertEqual(parsed_url.path, - "/whacky%20%26%20filename.mp4") - self.assertFalse(self.storage.bucket.meta.client.generate_presigned_url.called) - - def test_special_characters(self): - self.storage.custom_domain = "mock.cloudfront.net" - - name = "ãlöhâ.jpg" - content = ContentFile('new content') - self.storage.save(name, content) - self.storage.bucket.Object.assert_called_once_with(name) - - url = self.storage.url(name) - parsed_url = urlparse.urlparse(url) - self.assertEqual(parsed_url.path, "/%C3%A3l%C3%B6h%C3%A2.jpg") - - def test_strip_signing_parameters(self): - expected = 'http://bucket.s3-aws-region.amazonaws.com/foo/bar' - self.assertEqual(self.storage._strip_signing_parameters( - '%s?X-Amz-Date=12345678&X-Amz-Signature=Signature' % expected), expected) - self.assertEqual(self.storage._strip_signing_parameters( - '%s?expires=12345678&signature=Signature' % expected), expected) - - @skipIf(threading is None, 'Test requires threading') - def test_connection_threading(self): - connections = [] - - def thread_storage_connection(): - connections.append(self.storage.connection) - - for x in range(2): - t = threading.Thread(target=thread_storage_connection) - t.start() - t.join() - - # Connection for each thread needs to be unique - self.assertIsNot(connections[0], connections[1]) - - def test_location_leading_slash(self): - msg = ( - "S3Boto3Storage.location cannot begin with a leading slash. " - "Found '/'. Use '' instead." - ) - with self.assertRaises(ImproperlyConfigured, msg=msg): - s3boto3.S3Boto3Storage(location='/') - - def test_deprecated_acl(self): - with override_settings(AWS_DEFAULT_ACL=None), warnings.catch_warnings(record=True) as w: - s3boto3.S3Boto3Storage(acl='private') - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - message = ( - "The acl argument of S3Boto3Storage is deprecated. Use argument " - "default_acl or setting AWS_DEFAULT_ACL instead. The acl argument " - "will be removed in version 1.10." - ) - assert str(w[-1].message) == message - - def test_deprecated_bucket(self): - with override_settings(AWS_DEFAULT_ACL=None), warnings.catch_warnings(record=True) as w: - s3boto3.S3Boto3Storage(bucket='django') - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - message = ( - "The bucket argument of S3Boto3Storage is deprecated. Use argument " - "bucket_name or setting AWS_STORAGE_BUCKET_NAME instead. The bucket " - "argument will be removed in version 1.10." - ) - assert str(w[-1].message) == message - - def test_deprecated_default_acl(self): - with warnings.catch_warnings(record=True) as w: - s3boto3.S3Boto3Storage() - assert len(w) == 1 - message = ( - "The default behavior of S3Boto3Storage is insecure and will change " - "in django-storages 1.10. By default files and new buckets are saved " - "with an ACL of 'public-read' (globally publicly readable). Version 1.10 will " - "default to using the bucket's ACL. To opt into the new behavior set " - "AWS_DEFAULT_ACL = None, otherwise to silence this warning explicitly " - "set AWS_DEFAULT_ACL." - ) - assert str(w[-1].message) == message - - def test_deprecated_autocreate_bucket(self): - with override_settings(AWS_DEFAULT_ACL=None), warnings.catch_warnings(record=True) as w: - s3boto3.S3Boto3Storage(auto_create_bucket=True) - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - message = ( - "Automatic bucket creation will be removed in version 1.10. It encourages " - "using overly broad credentials with this library. Either create it before " - "manually or use one of a myriad of automatic configuration management tools. " - "Unset AWS_AUTO_CREATE_BUCKET (it defaults to False) to silence this warning." - ) - assert str(w[-1].message) == message - - def test_deprecated_default_acl_override_class_variable(self): - class MyStorage(s3boto3.S3Boto3Storage): - default_acl = "private" - - with warnings.catch_warnings(record=True) as w: - MyStorage() - assert len(w) == 0 - - def test_override_settings(self): - with override_settings(AWS_LOCATION='foo1'): - storage = s3boto3.S3Boto3Storage() - self.assertEqual(storage.location, 'foo1') - with override_settings(AWS_LOCATION='foo2'): - storage = s3boto3.S3Boto3Storage() - self.assertEqual(storage.location, 'foo2') - - def test_override_class_variable(self): - class MyStorage1(s3boto3.S3Boto3Storage): - location = 'foo1' - - storage = MyStorage1() - self.assertEqual(storage.location, 'foo1') - - class MyStorage2(s3boto3.S3Boto3Storage): - location = 'foo2' - - storage = MyStorage2() - self.assertEqual(storage.location, 'foo2') - - def test_override_init_argument(self): - storage = s3boto3.S3Boto3Storage(location='foo1') - self.assertEqual(storage.location, 'foo1') - storage = s3boto3.S3Boto3Storage(location='foo2') - self.assertEqual(storage.location, 'foo2')