diff --git a/.gitignore b/.gitignore index 979398f..302e5d9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ syntax: glob *.swo .DS_Store ckan.egg-info/* +ckanext_cloudstorage.egg-info/* sandbox/* dist @@ -16,5 +17,6 @@ tmp/* solr_runtime/* fl_notes.txt *.ini +!alembic.ini .noseids *~ diff --git a/README.md b/README.md index bc13f0f..080764e 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ below have been tested: | Provider | Uploads | Downloads | Secure URLs (private resources) | | --- | --- | --- | --- | | Azure | YES | YES | YES (if `azure-storage` is installed) | -| AWS S3 | YES | YES | YES (if `boto` is installed) | +| AWS S3 | YES | YES | YES (if `boto` is installed and `host` key added to `driver_options`) | | Rackspace | YES | YES | No | # What are "Secure URLs"? @@ -54,7 +54,12 @@ benefits of your CDN/blob storage. This option also enables multipart uploads, but you need to create database tables first. Run next command from extension folder: - `paster cloudstorage initdb -c /etc/ckan/default/production.ini ` + + paster cloudstorage initdb -c /etc/ckan/default/production.ini + +For CKAN>=2.9 use the following command instead: + + ckan -c /etc/ckan/default/production.ini db upgrade -p cloudstorage With that feature you can use `cloudstorage_clean_multipart` action, which is available only for sysadmins. After executing, all unfinished multipart uploads, older than 7 days, diff --git a/ckanext/cloudstorage/cli.py b/ckanext/cloudstorage/cli.py index 8488c4e..80a2812 100644 --- a/ckanext/cloudstorage/cli.py +++ b/ckanext/cloudstorage/cli.py @@ -1,170 +1,32 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- -import os -import os.path -import cgi -import tempfile -from docopt import docopt -from ckan.lib.cli import CkanCommand +import click +import ckanext.cloudstorage.utils as utils -from ckanapi import LocalCKAN -from ckanext.cloudstorage.storage import ( - CloudStorage, - ResourceCloudStorage -) -from ckanext.cloudstorage.model import ( - create_tables, - drop_tables -) -from ckan.logic import NotFound -USAGE = """ckanext-cloudstorage +@click.group() +def cloudstorage(): + """CloudStorage management commands. + """ + pass -Commands: - - fix-cors Update CORS rules where possible. - - migrate Upload local storage to the remote. - - initdb Reinitalize database tables. -Usage: - cloudstorage fix-cors ... [--c=] - cloudstorage migrate [] [--c=] - cloudstorage initdb [--c=] +@cloudstorage.command('fix-cors') +@click.argument('domains', nargs=-1) +def fix_cors(domains): + """Update CORS rules where possible. + """ + msg, ok = utils.fix_cors(domains) + click.secho(msg, fg='green' if ok else 'red') -Options: - -c= The CKAN configuration file. -""" +@cloudstorage.command() +@click.argument('path') +@click.argument('resource', required=False) +def migrate(path, resource): + """Upload local storage to the remote. + """ + utils.migrate(path, resource) -class FakeFileStorage(cgi.FieldStorage): - def __init__(self, fp, filename): - self.file = fp - self.filename = filename - - -class PasterCommand(CkanCommand): - summary = 'ckanext-cloudstorage maintence utilities.' - usage = USAGE - - def command(self): - self._load_config() - args = docopt(USAGE, argv=self.args) - - if args['fix-cors']: - _fix_cors(args) - elif args['migrate']: - _migrate(args) - elif args['initdb']: - _initdb() - - -def _migrate(args): - path = args[''] - single_id = args[''] - if not os.path.isdir(path): - print('The storage directory cannot be found.') - return - - lc = LocalCKAN() - resources = {} - failed = [] - - # The resource folder is stuctured like so on disk: - # - storage/ - # - ... - # - resources/ - # - <3 letter prefix> - # - <3 letter prefix> - # - - # ... - # ... - # ... - for root, dirs, files in os.walk(path): - # Only the bottom level of the tree actually contains any files. We - # don't care at all about the overall structure. - if not files: - continue - - split_root = root.split('/') - resource_id = split_root[-2] + split_root[-1] - - for file_ in files: - ckan_res_id = resource_id + file_ - if single_id and ckan_res_id != single_id: - continue - - resources[ckan_res_id] = os.path.join( - root, - file_ - ) - - for i, resource in enumerate(resources.iteritems(), 1): - resource_id, file_path = resource - print('[{i}/{count}] Working on {id}'.format( - i=i, - count=len(resources), - id=resource_id - )) - - try: - resource = lc.action.resource_show(id=resource_id) - except NotFound: - print(u'\tResource not found') - continue - if resource['url_type'] != 'upload': - print(u'\t`url_type` is not `upload`. Skip') - continue - - with open(file_path, 'rb') as fin: - resource['upload'] = FakeFileStorage( - fin, - resource['url'].split('/')[-1] - ) - try: - uploader = ResourceCloudStorage(resource) - uploader.upload(resource['id']) - except Exception as e: - failed.append(resource_id) - print(u'\tError of type {0} during upload: {1}'.format(type(e), e)) - - if failed: - log_file = tempfile.NamedTemporaryFile(delete=False) - log_file.file.writelines(failed) - print(u'ID of all failed uploads are saved to `{0}`'.format(log_file.name)) - - -def _fix_cors(args): - cs = CloudStorage() - - if cs.can_use_advanced_azure: - from azure.storage import blob as azure_blob - from azure.storage import CorsRule - - blob_service = azure_blob.BlockBlobService( - cs.driver_options['key'], - cs.driver_options['secret'] - ) - - blob_service.set_blob_service_properties( - cors=[ - CorsRule( - allowed_origins=args[''], - allowed_methods=['GET'] - ) - ] - ) - print('Done!') - else: - print( - 'The driver {driver_name} being used does not currently' - ' support updating CORS rules through' - ' cloudstorage.'.format( - driver_name=cs.driver_name - ) - ) - - -def _initdb(): - drop_tables() - create_tables() - print("DB tables are reinitialized") +def get_commands(): + return [cloudstorage] diff --git a/ckanext/cloudstorage/commands.py b/ckanext/cloudstorage/commands.py new file mode 100644 index 0000000..2d31c84 --- /dev/null +++ b/ckanext/cloudstorage/commands.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import print_function +from ckan.lib.cli import CkanCommand +from docopt import docopt + +import ckanext.cloudstorage.utils as utils + +USAGE = """ckanext-cloudstorage + +Commands: + - fix-cors Update CORS rules where possible. + - migrate Upload local storage to the remote. + - initdb Reinitalize database tables. + +Usage: + cloudstorage fix-cors ... [--c=] + cloudstorage migrate [] [--c=] + cloudstorage initdb [--c=] + +Options: + -c= The CKAN configuration file. +""" + + +class PasterCommand(CkanCommand): + summary = 'ckanext-cloudstorage maintence utilities.' + usage = USAGE + + def command(self): + self._load_config() + args = docopt(USAGE, argv=self.args) + + if args['fix-cors']: + _fix_cors(args) + elif args['migrate']: + _migrate(args) + elif args['initdb']: + _initdb() + + +def _migrate(args): + path = args[''] + single_id = args[''] + utils.migrate(path, single_id) + + +def _fix_cors(args): + msg, _ = utils.fix_cors(args['']) + print(msg) + + +def _initdb(): + utils.initdb() + print("DB tables are reinitialized") diff --git a/ckanext/cloudstorage/controller.py b/ckanext/cloudstorage/controller.py index 992b0c1..46569cb 100644 --- a/ckanext/cloudstorage/controller.py +++ b/ckanext/cloudstorage/controller.py @@ -1,62 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os.path -from pylons import c -from pylons.i18n import _ - -from ckan import logic, model -from ckan.lib import base, uploader -import ckan.lib.helpers as h +from ckan.lib import base +import ckanext.cloudstorage.utils as utils class StorageController(base.BaseController): def resource_download(self, id, resource_id, filename=None): - context = { - 'model': model, - 'session': model.Session, - 'user': c.user or c.author, - 'auth_user_obj': c.userobj - } - - try: - resource = logic.get_action('resource_show')( - context, - { - 'id': resource_id - } - ) - except logic.NotFound: - base.abort(404, _('Resource not found')) - except logic.NotAuthorized: - base.abort(401, _('Unauthorized to read resource {0}'.format(id))) - - # This isn't a file upload, so either redirect to the source - # (if available) or error out. - if resource.get('url_type') != 'upload': - url = resource.get('url') - if not url: - base.abort(404, _('No download is available')) - h.redirect_to(url) - - if filename is None: - # No filename was provided so we'll try to get one from the url. - filename = os.path.basename(resource['url']) - - upload = uploader.get_resource_uploader(resource) - - # if the client requests with a Content-Type header (e.g. Text preview) - # we have to add the header to the signature - try: - content_type = getattr(c.pylons.request, "content_type", None) - except AttributeError: - content_type = None - uploaded_url = upload.get_url_from_filename(resource['id'], filename, - content_type=content_type) - - # The uploaded file is missing for some reason, such as the - # provider being down. - if uploaded_url is None: - base.abort(404, _('No download is available')) - - h.redirect_to(uploaded_url) + return utils.resource_download(id, resource_id, filename) diff --git a/ckanext/cloudstorage/fanstatic/scripts/cloudstorage-multipart-upload.js b/ckanext/cloudstorage/fanstatic/scripts/cloudstorage-multipart-upload.js index cf0d083..a1deb03 100644 --- a/ckanext/cloudstorage/fanstatic/scripts/cloudstorage-multipart-upload.js +++ b/ckanext/cloudstorage/fanstatic/scripts/cloudstorage-multipart-upload.js @@ -1,443 +1,480 @@ -ckan.module('cloudstorage-multipart-upload', function($, _) { - 'use strict'; - - return { - options: { - cloud: 'S3', - i18n: { - resource_create: _('Resource has been created.'), - resource_update: _('Resource has been updated.'), - undefined_upload_id: _('Undefined uploadId.'), - upload_completed: _('Upload completed. You will be redirected in few seconds...'), - unable_to_finish: _('Unable to finish multipart upload') - } - }, - - _partNumber: 1, - - _uploadId: null, - _packageId: null, - _resourceId: null, - _uploadSize: null, - _uploadName: null, - _uploadedParts: null, - _clickedBtn: null, - _redirect_url: null, - - initialize: function() { - $.proxyAll(this, /_on/); - this.options.packageId = this.options.packageId.slice(1); - this._form = this.$('form'); - // this._origin = $('#field-image-upload'); - // this._file = this._origin.clone() - this._file = $('#field-image-upload'); - this._url = $('#field-image-url'); - this._save = $('[name=save]'); - this._id = $('input[name=id]'); - this._progress = $('
', { - class: 'hide controls progress progress-striped active' - }); - this._bar = $('
', {class:'bar'}); - this._progress.append(this._bar); - this._progress.insertAfter(this._url.parent().parent()); - this._resumeBtn = $('', {class: 'hide btn btn-info controls'}).insertAfter( - this._progress).text('Resume Upload'); - - var self = this; - - this._file.fileupload({ - url: this.sandbox.client.url('/api/action/cloudstorage_upload_multipart'), - maxChunkSize: 5 * 1024 * 1024, - replaceFileInput: false, - formData: this._onGenerateAdditionalData, - submit: this._onUploadFileSubmit, - chunkdone: this._onChunkUploaded, - add: this._onFileUploadAdd, - progressall: this._onFileUploadProgress, - done: this._onFinishUpload, - fail: this._onUploadFail, - always: this._onAnyEndedUpload - }); - - this._save.on('click', this._onSaveClick); - - this._onCheckExistingMultipart('choose'); - }, - - _onChunkUploaded: function () { - this._uploadedParts = this._partNumber++; - }, - - _onCheckExistingMultipart: function (operation) { - var self = this; - var id = this._id.val(); - if (!id) return; - this.sandbox.client.call( - 'POST', - 'cloudstorage_check_multipart', - {id: id}, - function (data) { - if (!data.result) return; - var upload = data.result.upload; - - var name = upload.name.slice(upload.name.lastIndexOf('/')+1); - self._uploadId = upload.id; - self._uploadSize = upload.size; - self._uploadedParts = upload.parts; - self._uploadName = upload.original_name; - self._partNumber = self._uploadedParts + 1; - - - var current_chunk_size = self._file.fileupload('option', 'maxChunkSize'); - var uploaded_bytes = current_chunk_size * upload.parts; - self._file.fileupload('option', 'uploadedBytes', uploaded_bytes); - - self.sandbox.notify( - 'Incomplete upload', - 'File: ' + upload.original_name + - '; Size: ' + self._uploadSize, - 'warning'); - self._onEnableResumeBtn(operation); - }, - function (error) { - console.log(error); - setTimeout(function() { - self._onCheckExistingMultipart(operation); - }, 2000); - } - - ); - }, - - _onEnableResumeBtn: function (operation) { - var self = this; - this.$('.btn-remove-url').remove(); - if (operation === 'choose'){ - self._onDisableSave(true); - - } - this._resumeBtn - .off('click') - .on('click', function (event) { - switch (operation) { - case 'resume': - self._save.trigger('click'); - self._onDisableResumeBtn(); - break; - case 'choose': - default: - self._file.trigger('click'); - break; - } - }) - .show(); - }, - - _onDisableResumeBtn: function () { - this._resumeBtn.hide(); +ckan.module("cloudstorage-multipart-upload", function($, _) { + "use strict"; + + return { + options: { + cloud: "S3", + i18n: { + resource_create: _("Resource has been created."), + resource_update: _("Resource has been updated."), + undefined_upload_id: _("Undefined uploadId."), + upload_completed: _( + "Upload completed. You will be redirected in few seconds..." + ), + unable_to_finish: _("Unable to finish multipart upload") + } + }, + + _partNumber: 1, + + _uploadId: null, + _packageId: null, + _resourceId: null, + _uploadSize: null, + _uploadName: null, + _uploadedParts: null, + _clickedBtn: null, + _redirect_url: null, + + initialize: function() { + $.proxyAll(this, /_on/); + // There is an undescore as a prefix added to package ID in + // order to prevent type-coercion, so we have to strip it + this.options.packageId = this.options.packageId.slice(1); + this._form = this.$("form"); + this._file = $("#field-image-upload"); + this._url = $("#field-image-url"); + this._save = $("[name=save]"); + this._id = $("input[name=id]"); + this._progress = $("
", { + class: "progress" + }); + + this._upload_message = $("
Please wait until upload finishes
"); + this._upload_message.addClass("upload-message"); + this._upload_message.css("margin-top", "10px"); + this._upload_message.css("line-height", "0px"); + this._upload_message.css("text-align", "center"); + this._upload_message.css("text-align", "center"); + this._progress.append(this._upload_message); + + this._bar = $("
", { + class: "progress-bar progress-bar-striped progress-bar-animated active" + }); + this._progress.append(this._bar); + this._progress.insertAfter(this._url.parent().parent()); + this._progress.hide(); + + this._resumeBtn = $("", { class: "btn btn-info controls" }) + .insertAfter(this._progress) + .text("Resume Upload"); + this._resumeBtn.hide(); + + var self = this; + + this._file.fileupload({ + url: this.sandbox.client.url( + "/api/action/cloudstorage_upload_multipart" + ), + maxChunkSize: 5 * 1024 * 1024, + replaceFileInput: false, + formData: this._onGenerateAdditionalData, + submit: this._onUploadFileSubmit, + chunkdone: this._onChunkUploaded, + add: this._onFileUploadAdd, + progressall: this._onFileUploadProgress, + done: this._onFinishUpload, + fail: this._onUploadFail, + always: this._onAnyEndedUpload + }); + + this._save.on("click", this._onSaveClick); + + this._onCheckExistingMultipart("choose"); + (function blink() { + $(".upload-message") + .fadeOut(500) + .fadeIn(500, blink); + })(); + }, + + _onChunkUploaded: function() { + this._uploadedParts = this._partNumber++; + }, + + _onCheckExistingMultipart: function(operation) { + var self = this; + var id = this._id.val(); + if (!id) return; + this.sandbox.client.call( + "POST", + "cloudstorage_check_multipart", + { id: id }, + function(data) { + if (!data.result) return; + var upload = data.result.upload; + + var name = upload.name.slice(upload.name.lastIndexOf("/") + 1); + self._uploadId = upload.id; + self._uploadSize = upload.size; + self._uploadedParts = upload.parts; + self._uploadName = upload.original_name; + self._partNumber = self._uploadedParts + 1; + + var current_chunk_size = self._file.fileupload( + "option", + "maxChunkSize" + ); + var uploaded_bytes = current_chunk_size * upload.parts; + self._file.fileupload("option", "uploadedBytes", uploaded_bytes); + + self.sandbox.notify( + "Incomplete upload", + "File: " + upload.original_name + "; Size: " + self._uploadSize, + "warning" + ); + self._onEnableResumeBtn(operation); }, - - _onUploadFail: function (e, data) { - this._onHandleError('Upload fail'); - this._onCheckExistingMultipart('resume'); - }, - - _onUploadFileSubmit: function (event, data) { - if (!this._uploadId) { - this._onDisableSave(false); - this.sandbox.notify( - 'Upload error', - this.i18n('undefined_upload_id'), - 'error' - ); - return false; - } - - this._setProgressType('info', this._progress); - this._progress.show('slow'); - }, - - _onGenerateAdditionalData: function (form) { - return [ - { - name: 'partNumber', - value: this._partNumber - }, - { - name: 'uploadId', - value: this._uploadId - }, - { - name: 'id', - value: this._resourceId - } - - ]; - }, - - _onAnyEndedUpload: function () { - this._partNumber = 1; - }, - - _countChunkSize: function (size, chunk) { - while (size / chunk > 10000) chunk *= 2; - return chunk; - }, - - _onFileUploadAdd: function (event, data) { - this._setProgress(0, this._bar); - var file = data.files[0]; - var target = $(event.target); - - var chunkSize = this._countChunkSize(file.size, target.fileupload('option', 'maxChunkSize')); - - if (this._uploadName && this._uploadSize && this._uploadedParts !== null) { - if (this._uploadSize !== file.size || this._uploadName !== file.name){ - this._file.val(''); - this._onCleanUpload(); - this.sandbox.notify( - 'Mismatch file', - 'You are trying to upload wrong file. Cancel previous upload first.', - 'error' - ); - event.preventDefault(); - throw 'Wrong file'; - } - - - var loaded = chunkSize * this._uploadedParts; - - // target.fileupload('option', 'uploadedBytes', loaded); - this._onFileUploadProgress(event, { - total: file.size, - loaded: loaded - }); - - this._progress.show('slow'); - this._onDisableResumeBtn(); - this._save.trigger('click'); - - if (loaded >= file.size){ - this._onFinishUpload(); - } - - } - - - target.fileupload('option', 'maxChunkSize', chunkSize); - - this.el.off('multipartstarted.cloudstorage'); - this.el.on('multipartstarted.cloudstorage', function () { - data.submit(); - }); + function(error) { + console.error(error); + setTimeout(function() { + self._onCheckExistingMultipart(operation); + }, 2000); + } + ); + }, + + _onEnableResumeBtn: function(operation) { + var self = this; + this.$(".btn-remove-url").remove(); + if (operation === "choose") { + self._onDisableSave(true); + } + this._resumeBtn + .off("click") + .on("click", function(event) { + switch (operation) { + case "resume": + self._save.trigger("click"); + self._onDisableResumeBtn(); + break; + case "choose": + default: + self._file.trigger("click"); + break; + } + }) + .show(); + }, + + _onDisableResumeBtn: function() { + this._resumeBtn.hide(); + }, + + _onUploadFail: function(e, data) { + this._onHandleError("Upload fail"); + this._onCheckExistingMultipart("resume"); + }, + + _onUploadFileSubmit: function(event, data) { + if (!this._uploadId) { + this._onDisableSave(false); + this.sandbox.notify( + "Upload error", + this.i18n("undefined_upload_id"), + "error" + ); + return false; + } + + this._setProgressType("info", this._bar); + this._progress.show("slow"); + }, + + _onGenerateAdditionalData: function(form) { + return [ + { + name: "partNumber", + value: this._partNumber }, - - _onFileUploadProgress: function (event, data) { - var progress = 100 / (data.total / data.loaded); - this._setProgress(progress, this._bar); + { + name: "uploadId", + value: this._uploadId }, + { + name: "id", + value: this._resourceId + } + ]; + }, + + _onAnyEndedUpload: function() { + this._partNumber = 1; + }, + + _countChunkSize: function(size, chunk) { + while (size / chunk > 10000) chunk *= 2; + return chunk; + }, + + _onFileUploadAdd: function(event, data) { + this._setProgress(0, this._bar); + var file = data.files[0]; + var target = $(event.target); + + if (this.options.maxSize && !isNaN(parseInt(this.options.maxSize))) { + var max_size = parseInt(this.options.maxSize); + var file_size_gb = file.size / 1073741824; + if (file_size_gb > max_size) { + this._file.val(""); + this._onCleanUpload(); + this.sandbox.notify( + "Too large file:", + "You selected a file larger than " + + max_size + + "GB. Contact support for an alternative upload method or select a smaller one.", + "error" + ); + event.preventDefault(); + throw "Too large file"; + } + } + + var chunkSize = this._countChunkSize( + file.size, + target.fileupload("option", "maxChunkSize") + ); + + if ( + this._uploadName && + this._uploadSize && + this._uploadedParts !== null + ) { + if (this._uploadSize !== file.size || this._uploadName !== file.name) { + this._file.val(""); + this._onCleanUpload(); + this.sandbox.notify( + "Mismatch file", + "You are trying to upload wrong file. Select " + + this._uploadName + + " or delete this resource and create a new one.", + "error" + ); + event.preventDefault(); + throw "Wrong file"; + } - _onSaveClick: function(event, pass) { - if (pass || !window.FileList || !this._file || !this._file.val()) { - return; - } - event.preventDefault(); - var dataset_id = this.options.packageId; - this._clickedBtn = $(event.target).attr('value'); - if (this._clickedBtn == 'go-dataset') { - this._onDisableSave(false); - this._redirect_url = this.sandbox.url( - '/dataset/edit/' + - dataset_id); - window.location = this._redirect_url; - } else { - try{ - this._onDisableSave(true); - this._onSaveForm(); - } catch(error){ - console.log(error); - this._onDisableSave(false); - } - } + var loaded = chunkSize * this._uploadedParts; - // this._form.trigger('submit', true); - }, + // target.fileupload('option', 'uploadedBytes', loaded); + this._onFileUploadProgress(event, { + total: file.size, + loaded: loaded + }); - _onSaveForm: function() { - var file = this._file[0].files[0]; - var self = this; - var formData = this._form.serializeArray().reduce( - function (result, item) { - result[item.name] = item.value; - return result; - }, {}); - - formData.multipart_name = file.name; - formData.url = file.name; - formData.package_id = this.options.packageId; - formData.size = file.size; - formData.url_type = 'upload'; - var action = formData.id ? 'resource_update' : 'resource_create'; - var url = this._form.attr('action') || window.location.href; - this.sandbox.client.call( - 'POST', - action, - formData, - function (data) { - var result = data.result; - self._packageId = result.package_id; - self._resourceId = result.id; - - self._id.val(result.id); - self.sandbox.notify( - result.id, - self.i18n(action, {id: result.id}), - 'success' - ); - self._onPerformUpload(file); - }, - function (err, st, msg) { - self.sandbox.notify( - 'Error', - msg, - 'error' - ); - self._onHandleError('Unable to save resource'); - } - ); + this._progress.show("slow"); + this._onDisableResumeBtn(); + this._save.trigger("click"); + if (loaded >= file.size) { + this._onFinishUpload(); + } + } + + target.fileupload("option", "maxChunkSize", chunkSize); + + this.el.off("multipartstarted.cloudstorage"); + this.el.on("multipartstarted.cloudstorage", function() { + data.submit(); + }); + }, + + _onFileUploadProgress: function(event, data) { + var progress = 100 / (data.total / data.loaded); + this._setProgress(progress, this._bar); + }, + + _onSaveClick: function(event, pass) { + if (pass || !window.FileList || !this._file || !this._file.val()) { + return; + } + event.preventDefault(); + + var dataset_id = this.options.packageId; + this._clickedBtn = $(event.target).attr("value"); + if (this._clickedBtn == "go-dataset") { + this._onDisableSave(false); + this._redirect_url = this.sandbox.url("/dataset/edit/" + dataset_id); + window.location = this._redirect_url; + } else { + try { + $("html, body").animate({ scrollTop: 0 }, "50"); + this._onDisableSave(true); + this._onDisableRemove(true); + this._onSaveForm(); + } catch (error) { + console.error(error); + this._onDisableSave(false); + this._onDisableRemove(false); + } + } + + // this._form.trigger('submit', true); + }, + + _onSaveForm: function() { + var file = this._file[0].files[0]; + var self = this; + var formData = this._form.serializeArray().reduce(function(result, item) { + result[item.name] = item.value; + return result; + }, {}); + + formData.multipart_name = file.name; + formData.url = file.name; + formData.package_id = this.options.packageId; + formData.size = file.size; + formData.url_type = "upload"; + var action = formData.id ? "resource_update" : "resource_create"; + var url = this._form.attr("action") || window.location.href; + this.sandbox.client.call( + "POST", + action, + formData, + function(data) { + var result = data.result; + self._packageId = result.package_id; + self._resourceId = result.id; + + self._id.val(result.id); + self.sandbox.notify( + result.id, + self.i18n(action, { id: result.id }), + "success" + ); + self._onPerformUpload(file); }, - - - _onPerformUpload: function(file) { - var id = this._id.val(); - var self = this; - if (this._uploadId === null) - this._onPrepareUpload(file, id).then( - function (data) { - self._uploadId = data.result.id; - self.el.trigger('multipartstarted.cloudstorage'); - }, - function (err) { - console.log(err); - self._onHandleError('Unable to initiate multipart upload'); - } - ); - else - this.el.trigger('multipartstarted.cloudstorage'); - + function(err, st, msg) { + self.sandbox.notify("Error", msg, "error"); + self._onHandleError("Unable to save resource"); + } + ); + }, + + _onPerformUpload: function(file) { + var id = this._id.val(); + var self = this; + if (this._uploadId === null) + this._onPrepareUpload(file, id).then( + function(data) { + self._uploadId = data.result.id; + self.el.trigger("multipartstarted.cloudstorage"); + }, + function(err) { + console.error(err); + self._onHandleError("Unable to initiate multipart upload"); + } + ); + else this.el.trigger("multipartstarted.cloudstorage"); + }, + + _onPrepareUpload: function(file, id) { + return $.ajax({ + method: "POST", + url: this.sandbox.client.url( + "/api/action/cloudstorage_initiate_multipart" + ), + data: JSON.stringify({ + id: id, + name: encodeURIComponent(file.name), + size: file.size + }) + }); + }, + + _onAbortUpload: function(id) { + var self = this; + this.sandbox.client.call( + "POST", + "cloudstorage_abort_multipart", + { + id: id }, - - _onPrepareUpload: function(file, id) { - - return $.ajax({ - method: 'POST', - url: this.sandbox.client.url('/api/action/cloudstorage_initiate_multipart'), - data: JSON.stringify({ - id: id, - name: encodeURIComponent(file.name), - size: file.size - }) - }); - + function(data) { + console.log(data); }, - - _onAbortUpload: function(id) { - var self = this; - this.sandbox.client.call( - 'POST', - 'cloudstorage_abort_multipart', - { - id: id - }, - function (data) { - console.log(data); - }, - function (err) { - console.log(err); - self._onHandleError('Unable to abort multipart upload'); - } + function(err) { + console.error(err); + self._onHandleError("Unable to abort multipart upload"); + } + ); + }, + + _onFinishUpload: function() { + var self = this; + var data_dict = { + uploadId: this._uploadId, + id: this._resourceId, + save_action: this._clickedBtn + }; + this.sandbox.client.call( + "POST", + "cloudstorage_finish_multipart", + data_dict, + function(data) { + self._progress.hide("fast"); + self._onDisableSave(false); + + if (self._resourceId && self._packageId) { + self.sandbox.notify( + "Success", + self.i18n("upload_completed"), + "success" ); - - }, - - _onFinishUpload: function() { - var self = this; - var data_dict = { - 'uploadId': this._uploadId, - 'id': this._resourceId, - 'save_action': this._clickedBtn + // self._form.remove(); + if (self._clickedBtn == "again") { + this._redirect_url = self.sandbox.url( + "/dataset/new_resource/" + self._packageId + ); + } else { + this._redirect_url = self.sandbox.url( + "/dataset/" + self._packageId + ); } - this.sandbox.client.call( - 'POST', - 'cloudstorage_finish_multipart', - data_dict, - function (data) { - - self._progress.hide('fast'); - self._onDisableSave(false); - - if (self._resourceId && self._packageId){ - self.sandbox.notify( - 'Success', - self.i18n('upload_completed'), - 'success' - ); - // self._form.remove(); - if (self._clickedBtn == 'again') { - this._redirect_url = self.sandbox.url( - '/dataset/new_resource/' + - self._packageId - ); - } else { - this._redirect_url = self.sandbox.url( - '/dataset/' + - self._packageId - ); - } - self._form.attr('action', this._redirect_url); - self._form.attr('method', 'GET'); - self.$('[name]').attr('name', null); - setTimeout(function(){ - self._form.submit(); - }, 3000); - - } - }, - function (err) { - console.log(err); - self._onHandleError(self.i18n('unable_to_finish')); - } - ); - this._setProgressType('success', this._progress); - }, - - _onDisableSave: function (value) { - this._save.attr('disabled', value); + self._form.attr("action", this._redirect_url); + self._form.attr("method", "GET"); + self.$("[name]").attr("name", null); + setTimeout(function() { + self._form.submit(); + }, 3000); + } }, - - _setProgress: function (progress, bar) { - bar.css('width', progress + '%'); - }, - - _setProgressType: function (type, progress) { - progress - .removeClass('progress-success progress-danger progress-info') - .addClass('progress-' + type); - }, - - _onHandleError: function (msg) { - this.sandbox.notify( - 'Error', - msg, - 'error' - ); - this._onDisableSave(false); - }, - - _onCleanUpload: function () { - this.$('.btn-remove-url').trigger('click'); + function(err) { + console.error(err); + self._onHandleError(self.i18n("unable_to_finish")); } - - }; + ); + this._setProgressType("success", this._bar); + }, + + _onDisableSave: function(value) { + this._save.attr("disabled", value); + }, + _onDisableRemove: function(value) { + $(".btn-remove-url").attr("disabled", value); + if (value) { + $(".btn-remove-url").off(); + } else { + $(".btn-remove-url").on(); + } + }, + _setProgress: function(progress, bar) { + bar.css("width", progress + "%"); + }, + + _setProgressType: function(type, bar) { + bar + .removeClass("bg-success bg-info bg-warning bg-danger") + .addClass("bg-" + type); + }, + + _onHandleError: function(msg) { + this.sandbox.notify("Error", msg, "error"); + this._onDisableSave(false); + }, + + _onCleanUpload: function() { + this.$(".btn-remove-url").trigger("click"); + } + }; }); diff --git a/ckanext/cloudstorage/fanstatic/scripts/webassets.yml b/ckanext/cloudstorage/fanstatic/scripts/webassets.yml new file mode 100644 index 0000000..34b9126 --- /dev/null +++ b/ckanext/cloudstorage/fanstatic/scripts/webassets.yml @@ -0,0 +1,10 @@ +main: + filters: rjsmin + output: ckanext-cloudstorage/%(version)s_main.js + extra: + preload: + - base/main + - vendor/fileupload + contents: + - vendor/jquery-widget.js + - cloudstorage-multipart-upload.js diff --git a/ckanext/cloudstorage/helpers.py b/ckanext/cloudstorage/helpers.py index 1a7ca99..cc8ef0b 100644 --- a/ckanext/cloudstorage/helpers.py +++ b/ckanext/cloudstorage/helpers.py @@ -1,11 +1,20 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- from ckanext.cloudstorage.storage import ResourceCloudStorage - +import ckan.plugins.toolkit as tk def use_secure_urls(): return all([ ResourceCloudStorage.use_secure_urls.fget(None), # Currently implemented just AWS version - 'S3' in ResourceCloudStorage.driver_name.fget(None) + 'S3' in ResourceCloudStorage.driver_name.fget(None), + 'host' in ResourceCloudStorage.driver_options.fget(None), ]) + + +def use_multipart_upload(): + return use_secure_urls() + + +def max_upload_size(): + return tk.config.get('ckanext.cloudstorage.max_upload_size_gb') diff --git a/ckanext/cloudstorage/logic/action/__init__.py b/ckanext/cloudstorage/logic/action/__init__.py index e69de29..d9adf7d 100644 --- a/ckanext/cloudstorage/logic/action/__init__.py +++ b/ckanext/cloudstorage/logic/action/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +from ckanext.cloudstorage.logic.action import multipart + + +def get_actions(): + return { + 'cloudstorage_initiate_multipart': multipart.initiate_multipart, + 'cloudstorage_upload_multipart': multipart.upload_multipart, + 'cloudstorage_finish_multipart': multipart.finish_multipart, + 'cloudstorage_abort_multipart': multipart.abort_multipart, + 'cloudstorage_check_multipart': multipart.check_multipart, + 'cloudstorage_clean_multipart': multipart.clean_multipart, + } diff --git a/ckanext/cloudstorage/logic/action/multipart.py b/ckanext/cloudstorage/logic/action/multipart.py index b27d34d..aee0cbc 100644 --- a/ckanext/cloudstorage/logic/action/multipart.py +++ b/ckanext/cloudstorage/logic/action/multipart.py @@ -2,17 +2,26 @@ # -*- coding: utf-8 -*- import logging import datetime +import mimetypes +import libcloud.security -from pylons import config from sqlalchemy.orm.exc import NoResultFound import ckan.model as model import ckan.lib.helpers as h import ckan.plugins.toolkit as toolkit +from ckan.lib.uploader import get_resource_uploader from ckanext.cloudstorage.storage import ResourceCloudStorage from ckanext.cloudstorage.model import MultipartUpload, MultipartPart from werkzeug.datastructures import FileStorage as FlaskFileStorage +if toolkit.check_ckan_version("2.9"): + config = toolkit.config +else: + from pylons import config + +libcloud.security.VERIFY_SSL_CERT = True + log = logging.getLogger(__name__) @@ -96,11 +105,14 @@ def initiate_multipart(context, data_dict): h.check_access('cloudstorage_initiate_multipart', data_dict) id, name, size = toolkit.get_or_bust(data_dict, ['id', 'name', 'size']) - user_id = None - if context['auth_user_obj']: - user_id = context['auth_user_obj'].id - - uploader = ResourceCloudStorage({'multipart_name': name}) + user_obj = model.User.get(context['user']) + user_id = user_obj.id if user_obj else None + + uploader = get_resource_uploader({'multipart_name': name, "id": id}) + if not isinstance(uploader, ResourceCloudStorage): + raise toolkit.ValidationError({ + "uploader": [f"Must be ResourceCloudStorage or its subclass, not {type(uploadev)}"] + }) res_name = uploader.path_from_filename(id, name) upload_object = MultipartUpload.by_name(res_name) @@ -114,33 +126,43 @@ def initiate_multipart(context, data_dict): resource_id=id): _delete_multipart(old_upload, uploader) + # Find and remove previous file from this resourve _rindex = res_name.rfind('/') if ~_rindex: try: name_prefix = res_name[:_rindex] - for cloud_object in uploader.container.iterate_objects(): - if cloud_object.name.startswith(name_prefix): - log.info('Removing cloud object: %s' % cloud_object) - cloud_object.delete() + old_objects = uploader.driver.iterate_container_objects( + uploader.container, + name_prefix + ) + + for obj in old_objects: + for similar in model.Session.query(model.Resource).filter_by(url=obj.name[len(name_prefix)+1:]): + if obj.name == uploader.path_from_filename(similar.id, similar.url): + log.info('Leave cloud object because it is referenced by resource %s: %s', similar.id, obj) + break + else: + log.info('Removing cloud object: %s' % obj) + obj.delete() except Exception as e: log.exception('[delete from cloud] %s' % e) - resp = uploader.driver.connection.request( - _get_object_url(uploader, res_name) + '?uploads', - method='POST' + headers = None + content_type, _ = mimetypes.guess_type(res_name) + if content_type: + headers = {"Content-type": content_type} + upload_object = MultipartUpload( + uploader.driver._initiate_multipart( + container=uploader.container, + object_name=res_name, + headers=headers + ), + id, + res_name, + size, + name, + user_id ) - if not resp.success(): - raise toolkit.ValidationError(resp.error) - try: - upload_id = resp.object.find( - '{%s}UploadId' % resp.object.nsmap[None]).text - except AttributeError: - upload_id_list = filter( - lambda e: e.tag.endswith('UploadId'), - resp.object.getchildren() - ) - upload_id = upload_id_list[0].text - upload_object = MultipartUpload(upload_id, id, res_name, size, name, user_id) upload_object.save() return upload_object.as_dict() @@ -149,17 +171,27 @@ def initiate_multipart(context, data_dict): def upload_multipart(context, data_dict): h.check_access('cloudstorage_upload_multipart', data_dict) upload_id, part_number, part_content = toolkit.get_or_bust( - data_dict, ['uploadId', 'partNumber', 'upload']) + data_dict, + ['uploadId', 'partNumber', 'upload'] + ) - uploader = ResourceCloudStorage({}) upload = model.Session.query(MultipartUpload).get(upload_id) + uploader = get_resource_uploader({"id": upload.resource_id}) + data = _get_underlying_file(part_content).read() resp = uploader.driver.connection.request( _get_object_url( - uploader, upload.name) + '?partNumber={0}&uploadId={1}'.format( - part_number, upload_id), + uploader, upload.name + ), + params={ + 'uploadId': upload_id, + 'partNumber': part_number + }, method='PUT', - data=bytearray(_get_underlying_file(part_content).read()) + headers={ + 'Content-Length': len(data) + }, + data=data ) if resp.status != 200: raise toolkit.ValidationError('Upload failed: part %s' % part_number) @@ -193,16 +225,19 @@ def finish_multipart(context, data_dict): for part in model.Session.query(MultipartPart).filter_by( upload_id=upload_id).order_by(MultipartPart.n) ] - uploader = ResourceCloudStorage({}) + uploader = get_resource_uploader({"id": upload.resource_id}) + try: obj = uploader.container.get_object(upload.name) obj.delete() except Exception: pass uploader.driver._commit_multipart( - _get_object_url(uploader, upload.name), - upload_id, - chunks) + container=uploader.container, + object_name=upload.name, + upload_id=upload_id, + chunks=chunks + ) upload.delete() upload.commit() @@ -225,8 +260,8 @@ def finish_multipart(context, data_dict): def abort_multipart(context, data_dict): h.check_access('cloudstorage_abort_multipart', data_dict) id = toolkit.get_or_bust(data_dict, ['id']) - uploader = ResourceCloudStorage({}) + uploader = get_resource_uploader({"id": id}) resource_uploads = MultipartUpload.resource_uploads(id) aborted = [] @@ -255,7 +290,6 @@ def clean_multipart(context, data_dict): """ h.check_access('cloudstorage_clean_multipart', data_dict) - uploader = ResourceCloudStorage({}) delta = _get_max_multipart_lifetime() oldest_allowed = datetime.datetime.utcnow() - delta @@ -270,6 +304,8 @@ def clean_multipart(context, data_dict): } for upload in uploads_to_remove: + uploader = get_resource_uploader({"id": upload.resource_id}) + try: _delete_multipart(upload, uploader) except toolkit.ValidationError as e: diff --git a/ckanext/cloudstorage/logic/auth/__init__.py b/ckanext/cloudstorage/logic/auth/__init__.py index e69de29..c9e6391 100644 --- a/ckanext/cloudstorage/logic/auth/__init__.py +++ b/ckanext/cloudstorage/logic/auth/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +from ckanext.cloudstorage.logic.auth import multipart + + +def get_auth_functions(): + return { + 'cloudstorage_initiate_multipart': multipart.initiate_multipart, + 'cloudstorage_upload_multipart': multipart.upload_multipart, + 'cloudstorage_finish_multipart': multipart.finish_multipart, + 'cloudstorage_abort_multipart': multipart.abort_multipart, + 'cloudstorage_check_multipart': multipart.check_multipart, + 'cloudstorage_clean_multipart': multipart.clean_multipart, + } diff --git a/ckanext/cloudstorage/migration/cloudstorage/README b/ckanext/cloudstorage/migration/cloudstorage/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/ckanext/cloudstorage/migration/cloudstorage/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/ckanext/cloudstorage/migration/cloudstorage/alembic.ini b/ckanext/cloudstorage/migration/cloudstorage/alembic.ini new file mode 100644 index 0000000..47fc754 --- /dev/null +++ b/ckanext/cloudstorage/migration/cloudstorage/alembic.ini @@ -0,0 +1,74 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = %(here)s + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# timezone to use when rendering the date +# within the migration file as well as the filename. +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +#truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; this defaults +# to /home/sergey/Projects/core/ckanext-cloudstorage/ckanext/cloudstorage/migration/cloudstorage/versions. When using multiple version +# directories, initial revisions must be specified with --version-path +# version_locations = %(here)s/bar %(here)s/bat /home/sergey/Projects/core/ckanext-cloudstorage/ckanext/cloudstorage/migration/cloudstorage/versions + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/ckanext/cloudstorage/migration/cloudstorage/env.py b/ckanext/cloudstorage/migration/cloudstorage/env.py new file mode 100644 index 0000000..0093682 --- /dev/null +++ b/ckanext/cloudstorage/migration/cloudstorage/env.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +from __future__ import with_statement +from alembic import context +from sqlalchemy import engine_from_config, pool +from logging.config import fileConfig + +import os + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + +name = os.path.basename(os.path.dirname(__file__)) + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + + url = config.get_main_option(u"sqlalchemy.url") + context.configure( + url=url, target_metadata=target_metadata, literal_binds=True, + version_table=u'{}_alembic_version'.format(name) + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix=u'sqlalchemy.', + poolclass=pool.NullPool) + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + version_table=u'{}_alembic_version'.format(name) + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/ckanext/cloudstorage/migration/cloudstorage/script.py.mako b/ckanext/cloudstorage/migration/cloudstorage/script.py.mako new file mode 100644 index 0000000..2c01563 --- /dev/null +++ b/ckanext/cloudstorage/migration/cloudstorage/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/ckanext/cloudstorage/migration/cloudstorage/versions/472b797d58d7_create_multipart_tables.py b/ckanext/cloudstorage/migration/cloudstorage/versions/472b797d58d7_create_multipart_tables.py new file mode 100644 index 0000000..98ef835 --- /dev/null +++ b/ckanext/cloudstorage/migration/cloudstorage/versions/472b797d58d7_create_multipart_tables.py @@ -0,0 +1,51 @@ +"""Create multipart tables + +Revision ID: 472b797d58d7 +Revises: +Create Date: 2021-01-12 14:24:02.227319 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.engine.reflection import Inspector + +# revision identifiers, used by Alembic. +revision = "472b797d58d7" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + conn = op.get_bind() + inspector = Inspector.from_engine(conn) + tables = inspector.get_table_names() + if "cloudstorage_multipart_upload" not in tables: + op.create_table( + "cloudstorage_multipart_upload", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column("resource_id", sa.UnicodeText), + sa.Column("name", sa.UnicodeText), + sa.Column("initiated", sa.DateTime), + sa.Column("size", sa.Numeric), + sa.Column("original_name", sa.UnicodeText), + sa.Column("user_id", sa.UnicodeText), + ) + + if "cloudstorage_multipart_part" not in tables: + op.create_table( + "cloudstorage_multipart_part", + sa.Column("n", sa.Integer, primary_key=True), + sa.Column("etag", sa.UnicodeText, primary_key=True), + sa.Column( + "upload_id", + sa.UnicodeText, + sa.ForeignKey("cloudstorage_multipart_upload.id"), + primary_key=True, + ), + ) + + +def downgrade(): + op.drop_table("cloudstorage_multipart_part") + op.drop_table("cloudstorage_multipart_upload") diff --git a/ckanext/cloudstorage/plugin.py b/ckanext/cloudstorage/plugin.py deleted file mode 100644 index 5d7a939..0000000 --- a/ckanext/cloudstorage/plugin.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from ckan import plugins -from routes.mapper import SubMapper -import os.path -from ckanext.cloudstorage import storage -from ckanext.cloudstorage import helpers -import ckanext.cloudstorage.logic.action.multipart as m_action -import ckanext.cloudstorage.logic.auth.multipart as m_auth - - -class CloudStoragePlugin(plugins.SingletonPlugin): - plugins.implements(plugins.IUploader) - plugins.implements(plugins.IRoutes, inherit=True) - plugins.implements(plugins.IConfigurable) - plugins.implements(plugins.IConfigurer) - plugins.implements(plugins.IActions) - plugins.implements(plugins.ITemplateHelpers) - plugins.implements(plugins.IAuthFunctions) - plugins.implements(plugins.IResourceController, inherit=True) - - # IConfigurer - - def update_config(self, config): - plugins.toolkit.add_template_directory(config, 'templates') - plugins.toolkit.add_resource('fanstatic/scripts', 'cloudstorage-js') - - # ITemplateHelpers - - def get_helpers(self): - return dict( - cloudstorage_use_secure_urls=helpers.use_secure_urls - ) - - def configure(self, config): - - required_keys = ( - 'ckanext.cloudstorage.driver', - 'ckanext.cloudstorage.driver_options', - 'ckanext.cloudstorage.container_name' - ) - - for rk in required_keys: - if config.get(rk) is None: - raise RuntimeError( - 'Required configuration option {0} not found.'.format( - rk - ) - ) - - def get_resource_uploader(self, data_dict): - # We provide a custom Resource uploader. - return storage.ResourceCloudStorage(data_dict) - - def get_uploader(self, upload_to, old_filename=None): - # We don't provide misc-file storage (group images for example) - # Returning None here will use the default Uploader. - return None - - def before_map(self, map): - sm = SubMapper( - map, - controller='ckanext.cloudstorage.controller:StorageController' - ) - - # Override the resource download controllers so we can do our - # lookup with libcloud. - with sm: - sm.connect( - 'resource_download', - '/dataset/{id}/resource/{resource_id}/download', - action='resource_download' - ) - sm.connect( - 'resource_download', - '/dataset/{id}/resource/{resource_id}/download/{filename}', - action='resource_download' - ) - - return map - - # IActions - - def get_actions(self): - return { - 'cloudstorage_initiate_multipart': m_action.initiate_multipart, - 'cloudstorage_upload_multipart': m_action.upload_multipart, - 'cloudstorage_finish_multipart': m_action.finish_multipart, - 'cloudstorage_abort_multipart': m_action.abort_multipart, - 'cloudstorage_check_multipart': m_action.check_multipart, - 'cloudstorage_clean_multipart': m_action.clean_multipart, - } - - # IAuthFunctions - - def get_auth_functions(self): - return { - 'cloudstorage_initiate_multipart': m_auth.initiate_multipart, - 'cloudstorage_upload_multipart': m_auth.upload_multipart, - 'cloudstorage_finish_multipart': m_auth.finish_multipart, - 'cloudstorage_abort_multipart': m_auth.abort_multipart, - 'cloudstorage_check_multipart': m_auth.check_multipart, - 'cloudstorage_clean_multipart': m_auth.clean_multipart, - } - - # IResourceController - - def before_delete(self, context, resource, resources): - # let's get all info about our resource. It somewhere in resources - # but if there is some possibility that it isn't(magic?) we have - # `else` clause - - for res in resources: - if res['id'] == resource['id']: - break - else: - return - # just ignore simple links - if res['url_type'] != 'upload': - return - - # we don't want to change original item from resources, just in case - # someone will use it in another `before_delete`. So, let's copy it - # and add `clear_upload` flag - res_dict = dict(res.items() + [('clear_upload', True)]) - - uploader = self.get_resource_uploader(res_dict) - - # to be on the safe side, let's check existence of container - container = getattr(uploader, 'container', None) - if container is None: - return - - # and now uploader removes our file. - uploader.upload(resource['id']) - - # and all other files linked to this resource - if not uploader.leave_files: - upload_path = os.path.dirname( - uploader.path_from_filename( - resource['id'], - 'fake-name' - ) - ) - - for old_file in uploader.container.iterate_objects(): - if old_file.name.startswith(upload_path): - old_file.delete() diff --git a/ckanext/cloudstorage/plugin/__init__.py b/ckanext/cloudstorage/plugin/__init__.py new file mode 100644 index 0000000..7e94f09 --- /dev/null +++ b/ckanext/cloudstorage/plugin/__init__.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +import os.path + +from ckan import plugins + +from ckanext.cloudstorage.logic.action import get_actions +from ckanext.cloudstorage.logic.auth import get_auth_functions + +from ckanext.cloudstorage import storage +from ckanext.cloudstorage import helpers + +if plugins.toolkit.check_ckan_version("2.9"): + from ckanext.cloudstorage.plugin.flask_plugin import MixinPlugin +else: + from ckanext.cloudstorage.plugin.pylons_plugin import MixinPlugin + + +class CloudStoragePlugin(MixinPlugin, plugins.SingletonPlugin): + plugins.implements(plugins.IUploader) + plugins.implements(plugins.IConfigurable) + plugins.implements(plugins.IConfigurer) + plugins.implements(plugins.IActions) + plugins.implements(plugins.ITemplateHelpers) + plugins.implements(plugins.IAuthFunctions) + plugins.implements(plugins.IResourceController, inherit=True) + + # IConfigurer + + def update_config(self, config): + plugins.toolkit.add_template_directory(config, '../templates') + plugins.toolkit.add_resource('../fanstatic/scripts', 'cloudstorage-js') + + # ITemplateHelpers + + def get_helpers(self): + return dict( + cloudstorage_use_secure_urls=helpers.use_secure_urls, + cloudstorage_use_multipart_upload=helpers.use_multipart_upload, + cloudstorage_max_upload_size=helpers.max_upload_size, + ) + + # IConfigurable + + def configure(self, config): + + required_keys = ('ckanext.cloudstorage.driver', + 'ckanext.cloudstorage.driver_options', + 'ckanext.cloudstorage.container_name') + + for rk in required_keys: + if config.get(rk) is None: + raise RuntimeError( + 'Required configuration option {0} not found.'.format(rk)) + + # IUploader + + def get_resource_uploader(self, data_dict): + # We provide a custom Resource uploader. + return storage.ResourceCloudStorage(data_dict) + + def get_uploader(self, upload_to, old_filename=None): + # We don't provide misc-file storage (group images for example) + # Returning None here will use the default Uploader. + return None + + # IActions + + def get_actions(self): + return get_actions() + + # IAuthFunctions + + def get_auth_functions(self): + return get_auth_functions() + + # IResourceController + + def before_delete(self, context, resource, resources): + # let's get all info about our resource. It somewhere in resources + # but if there is some possibility that it isn't(magic?) we have + # `else` clause + + for res in resources: + if res['id'] == resource['id']: + break + else: + return + # just ignore simple links + if res['url_type'] != 'upload': + return + + # we don't want to change original item from resources, just in case + # someone will use it in another `before_delete`. So, let's copy it + # and add `clear_upload` flag + res_dict = dict(list(res.items()) + [('clear_upload', True)]) + + uploader = self.get_resource_uploader(res_dict) + + # to be on the safe side, let's check existence of container + container = getattr(uploader, 'container', None) + if container is None: + return + + # and now uploader removes our file. + uploader.upload(resource['id']) + + # and all other files linked to this resource + if not uploader.leave_files: + upload_path = os.path.dirname( + uploader.path_from_filename(resource['id'], 'fake-name')) + + old_files = uploader.driver.iterate_container_objects( + uploader.container, + upload_path + ) + + for old_file in old_files: + old_file.delete() diff --git a/ckanext/cloudstorage/plugin/flask_plugin.py b/ckanext/cloudstorage/plugin/flask_plugin.py new file mode 100644 index 0000000..7514151 --- /dev/null +++ b/ckanext/cloudstorage/plugin/flask_plugin.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- + +import ckan.plugins as p + +from ckanext.cloudstorage.views import get_blueprints +from ckanext.cloudstorage.cli import get_commands + + +class MixinPlugin(p.SingletonPlugin): + p.implements(p.IBlueprint) + p.implements(p.IClick) + + # IBlueprint + + def get_blueprint(self): + return get_blueprints() + + # IClick + + def get_commands(self): + return get_commands() diff --git a/ckanext/cloudstorage/plugin/pylons_plugin.py b/ckanext/cloudstorage/plugin/pylons_plugin.py new file mode 100644 index 0000000..8f35445 --- /dev/null +++ b/ckanext/cloudstorage/plugin/pylons_plugin.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +from routes.mapper import SubMapper + +import ckan.plugins as p + + +class MixinPlugin(p.SingletonPlugin): + p.implements(p.IRoutes, inherit=True) + + # IRoutes + + def before_map(self, map): + sm = SubMapper( + map, + controller='ckanext.cloudstorage.controller:StorageController') + + # Override the resource download controllers so we can do our + # lookup with libcloud. + with sm: + sm.connect('resource_download', + '/dataset/{id}/resource/{resource_id}/download', + action='resource_download') + sm.connect( + 'resource_download', + '/dataset/{id}/resource/{resource_id}/download/{filename}', + action='resource_download') + + return map diff --git a/ckanext/cloudstorage/storage.py b/ckanext/cloudstorage/storage.py index 903390b..cb93771 100644 --- a/ckanext/cloudstorage/storage.py +++ b/ckanext/cloudstorage/storage.py @@ -2,23 +2,40 @@ # -*- coding: utf-8 -*- import cgi import mimetypes -import os.path -import urlparse +import logging +import os +import six +import tempfile +from six.moves.urllib.parse import urljoin from ast import literal_eval from datetime import datetime, timedelta -from tempfile import SpooledTemporaryFile +import traceback -from pylons import config from ckan import model from ckan.lib import munge import ckan.plugins as p +import hashlib +import binascii from libcloud.storage.types import Provider, ObjectDoesNotExistError from libcloud.storage.providers import get_driver +import libcloud.common.types as types +if p.toolkit.check_ckan_version("2.9"): + from werkzeug.datastructures import FileStorage as UploadedFileType + + config = p.toolkit.config +else: + from pylons import config + + UploadedFileType = cgi.FieldStorage from werkzeug.datastructures import FileStorage as FlaskFileStorage + +log = logging.getLogger(__name__) + ALLOWED_UPLOAD_TYPES = (cgi.FieldStorage, FlaskFileStorage) +AWS_UPLOAD_PART_SIZE = 5 * 1024 * 1024 def _get_underlying_file(wrapper): @@ -27,6 +44,25 @@ def _get_underlying_file(wrapper): return wrapper.file +def _md5sum(fobj): + block_count = 0 + block = True + md5string = b'' + while block: + block = fobj.read(AWS_UPLOAD_PART_SIZE) + if block: + block_count += 1 + hash_obj = hashlib.md5() + hash_obj.update(block) + md5string = md5string + binascii.unhexlify(hash_obj.hexdigest()) + else: + break + fobj.seek(0, os.SEEK_SET) + hash_obj = hashlib.md5() + hash_obj.update(md5string) + return hash_obj.hexdigest() + "-" + str(block_count) + + class CloudStorage(object): def __init__(self): self.driver = get_driver( @@ -38,7 +74,7 @@ def __init__(self): self._container = None def path_from_filename(self, rid, filename): - raise NotImplemented + raise NotImplementedError @property def container(self): @@ -131,6 +167,10 @@ def can_use_advanced_aws(self): """ # Are we even using AWS? if 'S3' in self.driver_name: + if 'host' not in self.driver_options: + # newer libcloud versions(must-use for python3) + # requires host for secure URLs + return False try: # Yes? Is the boto package available? import boto @@ -173,17 +213,20 @@ def __init__(self, resource): multipart_name = resource.pop('multipart_name', None) # Check to see if a file has been provided - if isinstance(upload_field_storage, (ALLOWED_UPLOAD_TYPES)): + if isinstance(upload_field_storage, (ALLOWED_UPLOAD_TYPES)) and \ + upload_field_storage.filename: self.filename = munge.munge_filename(upload_field_storage.filename) self.file_upload = _get_underlying_file(upload_field_storage) resource['url'] = self.filename resource['url_type'] = 'upload' + resource['last_modified'] = datetime.utcnow() elif multipart_name and self.can_use_advanced_aws: # This means that file was successfully uploaded and stored # at cloud. # Currently implemented just AWS version resource['url'] = munge.munge_filename(multipart_name) resource['url_type'] = 'upload' + resource['last_modified'] = datetime.utcnow() elif self._clear and resource.get('id'): # Apparently, this is a created-but-not-commited resource whose # file upload has been canceled. We're copying the behaviour of @@ -233,7 +276,6 @@ def upload(self, id, max_size=10): content_settings = ContentSettings( content_type=content_type ) - return blob_service.create_blob_from_stream( container_name=self.container_name, blob_name=self.path_from_filename( @@ -244,18 +286,74 @@ def upload(self, id, max_size=10): content_settings=content_settings ) else: - - # TODO: This might not be needed once libcloud is upgraded - if isinstance(self.file_upload, SpooledTemporaryFile): - self.file_upload.next = self.file_upload.next() - - self.container.upload_object_via_stream( - self.file_upload, - object_name=self.path_from_filename( - id, - self.filename - ) - ) + try: + file_upload = self.file_upload + # in Python3 libcloud iterates over uploaded file, + # while it's wrappend into non-iterator. So, pick real + # file-object and give it to cloudstorage + # if six.PY3: + # file_upload = file_upload._file + + # self.container.upload_object_via_stream( + # file_upload, + # object_name=self.path_from_filename( + # id, + # self.filename + # ) + # ) + + # check if already uploaded + object_name = self.path_from_filename(id, self.filename) + try: + cloud_object = self.container.get_object(object_name=object_name) + log.debug("\t Object found, checking size %s: %s", object_name, cloud_object.size) + if os.path.isfile(self.filename): + file_size = os.path.getsize(self.filename) + else: + self.file_upload.seek(0, os.SEEK_END) + file_size = self.file_upload.tell() + self.file_upload.seek(0, os.SEEK_SET) + + log.debug("\t - File size %s: %s", self.filename, file_size) + if file_size == int(cloud_object.size): + log.debug("\t Size fits, checking hash %s: %s", object_name, cloud_object.hash) + hash_file = hashlib.md5(self.file_upload.read()).hexdigest() + self.file_upload.seek(0, os.SEEK_SET) + log.debug("\t - File hash %s: %s", self.filename, hash_file) + # basic hash + if hash_file == cloud_object.hash: + log.debug("\t => File found, matching hash, skipping upload") + return + # multipart hash + multi_hash_file = _md5sum(self.file_upload) + log.debug("\t - File multi hash %s: %s", self.filename, multi_hash_file) + if multi_hash_file == cloud_object.hash: + log.debug("\t => File found, matching hash, skipping upload") + return + log.debug("\t Resource found in the cloud but outdated, uploading") + except ObjectDoesNotExistError: + log.debug("\t Resource not found in the cloud, uploading") + + # If it's temporary file, we'd better convert it + # into FileIO. Otherwise libcloud will iterate + # over lines, not over chunks and it will really + # slow down the process for files that consist of + # millions of short linew + if isinstance(file_upload, tempfile.SpooledTemporaryFile): + file_upload.rollover() + try: + # extract underlying file + file_upload_iter = file_upload._file.detach() + except AttributeError: + # It's python2 + file_upload_iter = file_upload._file + else: + file_upload_iter = iter(file_upload) + self.container.upload_object_via_stream(iterator=file_upload_iter, object_name=object_name) + log.debug("\t => UPLOADED %s: %s", self.filename, object_name) + except (ValueError, types.InvalidCredsError) as err: + log.error(traceback.format_exc()) + raise err elif self._clear and self.old_filename and not self.leave_files: # This is only set when a previously-uploaded file is replace @@ -316,23 +414,29 @@ def get_url_from_filename(self, rid, filename, content_type=None): ) elif self.can_use_advanced_aws and self.use_secure_urls: from boto.s3.connection import S3Connection + os.environ['S3_USE_SIGV4'] = 'True' s3_connection = S3Connection( self.driver_options['key'], - self.driver_options['secret'] + self.driver_options['secret'], + host=self.driver_options['host'] ) + if 'region_name' in self.driver_options.keys(): + s3_connection.auth_region_name = self.driver_options['region_name'] + generate_url_params = {"expires_in": 60 * 60, "method": "GET", "bucket": self.container_name, - "query_auth": True, "key": path} if content_type: generate_url_params['headers'] = {"Content-Type": content_type} - - return s3_connection.generate_url(**generate_url_params) + return s3_connection.generate_url_sigv4(**generate_url_params) # Find the object for the given key. - obj = self.container.get_object(path) + try: + obj = self.container.get_object(path) + except ObjectDoesNotExistError: + return if obj is None: return @@ -341,11 +445,11 @@ def get_url_from_filename(self, rid, filename, content_type=None): return self.driver.get_object_cdn_url(obj) except NotImplementedError: if 'S3' in self.driver_name: - return urlparse.urljoin( + return urljoin( 'https://' + self.driver.connection.host, '{container}/{path}'.format( container=self.container_name, - path=path + path=six.ensure_str(path) ) ) # This extra 'url' property isn't documented anywhere, sadly. diff --git a/ckanext/cloudstorage/templates/cloudstorage/snippets/cloudstorage-js_asset.html b/ckanext/cloudstorage/templates/cloudstorage/snippets/cloudstorage-js_asset.html new file mode 100644 index 0000000..0727f6b --- /dev/null +++ b/ckanext/cloudstorage/templates/cloudstorage/snippets/cloudstorage-js_asset.html @@ -0,0 +1 @@ +{% asset 'cloudstorage-js/main' %} diff --git a/ckanext/cloudstorage/templates/cloudstorage/snippets/cloudstorage-js_resource.html b/ckanext/cloudstorage/templates/cloudstorage/snippets/cloudstorage-js_resource.html new file mode 100644 index 0000000..e71499a --- /dev/null +++ b/ckanext/cloudstorage/templates/cloudstorage/snippets/cloudstorage-js_resource.html @@ -0,0 +1 @@ +{% resource 'cloudstorage-js/main' %} diff --git a/ckanext/cloudstorage/templates/cloudstorage/snippets/multipart_module.html b/ckanext/cloudstorage/templates/cloudstorage/snippets/multipart_module.html index 2568614..48bedf1 100644 --- a/ckanext/cloudstorage/templates/cloudstorage/snippets/multipart_module.html +++ b/ckanext/cloudstorage/templates/cloudstorage/snippets/multipart_module.html @@ -1,10 +1,12 @@
+ data-module-max-size="{{ max_size }}" + {% endif %} +> {{ parent() }}
diff --git a/ckanext/cloudstorage/templates/package/new_resource.html b/ckanext/cloudstorage/templates/package/new_resource.html index 0505bca..f99bec1 100644 --- a/ckanext/cloudstorage/templates/package/new_resource.html +++ b/ckanext/cloudstorage/templates/package/new_resource.html @@ -2,7 +2,7 @@ {% block form %} - - {% snippet 'cloudstorage/snippets/multipart_module.html', pkg_name=pkg_name, parent=super %} + {% set max_size = h.cloudstorage_max_upload_size() %} + {% snippet 'cloudstorage/snippets/multipart_module.html', pkg_name=pkg_name, parent=super, max_size=max_size %} {% endblock %} diff --git a/ckanext/cloudstorage/templates/package/new_resource_not_draft.html b/ckanext/cloudstorage/templates/package/new_resource_not_draft.html index fae3e21..415570f 100644 --- a/ckanext/cloudstorage/templates/package/new_resource_not_draft.html +++ b/ckanext/cloudstorage/templates/package/new_resource_not_draft.html @@ -2,6 +2,6 @@ {% block form %} - - {% snippet 'cloudstorage/snippets/multipart_module.html', pkg_name=pkg_name, parent=super %} + {% set max_size = h.cloudstorage_max_upload_size() %} + {% snippet 'cloudstorage/snippets/multipart_module.html', pkg_name=pkg_name, parent=super, max_size=max_size %} {% endblock %} diff --git a/ckanext/cloudstorage/templates/package/resource_edit.html b/ckanext/cloudstorage/templates/package/resource_edit.html index 2e74235..137a45c 100644 --- a/ckanext/cloudstorage/templates/package/resource_edit.html +++ b/ckanext/cloudstorage/templates/package/resource_edit.html @@ -2,7 +2,7 @@ {% block form %} - - {% snippet 'cloudstorage/snippets/multipart_module.html', pkg_name=pkg.name, parent=super %} + {% set max_size = h.cloudstorage_max_upload_size() %} + {% snippet 'cloudstorage/snippets/multipart_module.html', pkg_name=pkg.name, parent=super, max_size=max_size %} {% endblock %} diff --git a/ckanext/cloudstorage/templates/page.html b/ckanext/cloudstorage/templates/page.html index 3759099..4ade9fe 100644 --- a/ckanext/cloudstorage/templates/page.html +++ b/ckanext/cloudstorage/templates/page.html @@ -2,6 +2,10 @@ {% block scripts %} {{ super() }} - {% resource 'cloudstorage-js/main' %} + {% with version = h.ckan_version() %} + {# version < '2.2' means CKAN>=2.10, because cloudstorage just won't work with CKAN==2.1.* #} + {% set type = 'asset' if h.ckan_version().split('.')[1] | int >= 9 else 'resource' %} + {% include 'cloudstorage/snippets/cloudstorage-js_' ~ type ~ '.html' %} + {% endwith %} {% endblock %} diff --git a/ckanext/cloudstorage/tests/__init__.py b/ckanext/cloudstorage/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/cloudstorage/tests/ckan_setup.py b/ckanext/cloudstorage/tests/ckan_setup.py new file mode 100644 index 0000000..be2d438 --- /dev/null +++ b/ckanext/cloudstorage/tests/ckan_setup.py @@ -0,0 +1,38 @@ +try: + from ckan.tests.pytest_ckan.ckan_setup import * # NOQA +except ImportError: + from ckan.config.middleware import make_app # NOQA + from ckan.common import config # NOQA + + import pkg_resources + from paste.deploy import loadapp + import sys + import os + + import pylons + from pylons.i18n.translation import _get_translator + + def pytest_addoption(parser): + """Allow using custom config file during tests. + """ + parser.addoption(u"--ckan-ini", action=u"store") + + def pytest_sessionstart(session): + """Initialize CKAN environment. + """ + global pylonsapp + path = os.getcwd() + sys.path.insert(0, path) + pkg_resources.working_set.add_entry(path) + pylonsapp = loadapp( + 'config:' + session.config.option.ckan_ini, + relative_to=path, + ) + + # Initialize a translator for tests that utilize i18n + translator = _get_translator(pylons.config.get('lang')) + pylons.translator._push_object(translator) + + class FakeResponse: + headers = {} # because render wants to delete Pragma + pylons.response._push_object(FakeResponse) diff --git a/ckanext/cloudstorage/tests/conftest.py b/ckanext/cloudstorage/tests/conftest.py new file mode 100644 index 0000000..5fcfd5f --- /dev/null +++ b/ckanext/cloudstorage/tests/conftest.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +import os + +import pytest +from ckanext.cloudstorage import utils + + +@pytest.fixture +def with_driver_options(ckan_config, monkeypatch): + """Apply config from env variablies - thus you won't have unstaged + changes in config file and won't accidentally commit your cloud + credentials. + + """ + driver = os.getenv('TEST_DRIVER') + + if not driver: + pytest.skip('TEST_DRIVER is not set') + monkeypatch.setitem(ckan_config, 'ckanext.cloudstorage.driver', driver) + + container = os.getenv('TEST_CONTAINER') + if not container: + pytest.skip('TEST_CONTAINER is not set') + monkeypatch.setitem(ckan_config, + 'ckanext.cloudstorage.container_name', container) + + options = os.getenv('TEST_DRIVER_OPTIONS') + if not options: + pytest.skip('TEST_DRIVER_OPTIONS is not set') + monkeypatch.setitem(ckan_config, + 'ckanext.cloudstorage.driver_options', options) + + +@pytest.fixture +def clean_db(reset_db): + """Initialize extension's tables. + """ + reset_db() + utils.initdb() diff --git a/ckanext/cloudstorage/tests/logic/__init__.py b/ckanext/cloudstorage/tests/logic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/cloudstorage/tests/logic/action/__init__.py b/ckanext/cloudstorage/tests/logic/action/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ckanext/cloudstorage/tests/logic/action/test_multipart.py b/ckanext/cloudstorage/tests/logic/action/test_multipart.py new file mode 100644 index 0000000..1b1383b --- /dev/null +++ b/ckanext/cloudstorage/tests/logic/action/test_multipart.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- + +import pytest +import six + +from libcloud.storage.types import ObjectDoesNotExistError + +from ckan.tests import factories, helpers +from ckanext.cloudstorage.storage import ResourceCloudStorage +from ckanext.cloudstorage.utils import FakeFileStorage + + +@pytest.mark.ckan_config('ckan.plugins', 'cloudstorage') +@pytest.mark.usefixtures( + 'with_driver_options', 'with_plugins', + 'with_request_context', 'clean_db') +class TestMultipartUpload(object): + + def test_upload(self): + filename = 'file.txt' + res = factories.Resource() + multipart = helpers.call_action( + 'cloudstorage_initiate_multipart', + id=res['id'], name='file.txt', size=1024 * 1024 * 5 * 2) + storage = ResourceCloudStorage(res) + assert storage.path_from_filename( + res['id'], filename) == multipart['name'] + assert storage.get_url_from_filename(res['id'], filename) is None + + fp = six.BytesIO(b'b' * 1024 * 1024 * 5) + fp.seek(0) + helpers.call_action( + 'cloudstorage_upload_multipart', + uploadId=multipart['id'], + partNumber=1, + upload=FakeFileStorage(fp, filename)) + + assert storage.get_url_from_filename(res['id'], filename) is None + + fp = six.BytesIO(b'a' * 1024 * 1024 * 5) + fp.seek(0) + helpers.call_action( + 'cloudstorage_upload_multipart', + uploadId=multipart['id'], + partNumber=2, + upload=FakeFileStorage(fp, filename)) + + assert storage.get_url_from_filename(res['id'], filename) is None + + result = helpers.call_action( + 'cloudstorage_finish_multipart', uploadId=multipart['id']) + assert result['commited'] + assert storage.get_url_from_filename(res['id'], filename) diff --git a/ckanext/cloudstorage/tests/test_helpers.py b/ckanext/cloudstorage/tests/test_helpers.py new file mode 100644 index 0000000..f0a4557 --- /dev/null +++ b/ckanext/cloudstorage/tests/test_helpers.py @@ -0,0 +1,42 @@ +import pytest + +import ckan.plugins.toolkit as tk + +_secure_urls = 'ckanext.cloudstorage.use_secure_urls' +_driver = 'ckanext.cloudstorage.driver' +_options = 'ckanext.cloudstorage.driver_options' + + +@pytest.mark.ckan_config('ckanext.cloudstorage.container_name', 'test') +@pytest.mark.ckan_config('ckan.plugins', 'cloudstorage') +@pytest.mark.usefixtures('with_plugins') +class TestUseSecureUrls(object): + @pytest.mark.ckan_config(_secure_urls, 'true') + @pytest.mark.ckan_config(_driver, 'AZURE_BLOBS') + @pytest.mark.ckan_config(_options, '{}') + def test_unsupported_provider_enabled(self): + assert not tk.h.cloudstorage_use_secure_urls() + + @pytest.mark.ckan_config(_secure_urls, 'false') + @pytest.mark.ckan_config(_driver, 'AZURE_BLOBS') + @pytest.mark.ckan_config(_options, '{}') + def test_unsupported_provider_disabled(self): + assert not tk.h.cloudstorage_use_secure_urls() + + @pytest.mark.ckan_config(_secure_urls, 'true') + @pytest.mark.ckan_config(_driver, 'S3_US_WEST') + @pytest.mark.ckan_config(_options, '{}') + def test_supported_provider_enabled_withoug_host(self): + assert not tk.h.cloudstorage_use_secure_urls() + + @pytest.mark.ckan_config(_secure_urls, 'true') + @pytest.mark.ckan_config(_driver, 'S3_US_WEST') + @pytest.mark.ckan_config(_options, '{"host": "x"}') + def test_supported_provider_enabled_with_host(self): + assert tk.h.cloudstorage_use_secure_urls() + + @pytest.mark.ckan_config(_secure_urls, 'false') + @pytest.mark.ckan_config(_driver, 'S3_US_WEST') + @pytest.mark.ckan_config(_options, '{}') + def test_supported_provider_disabled(self): + assert not tk.h.cloudstorage_use_secure_urls() diff --git a/ckanext/cloudstorage/tests/test_plugin.py b/ckanext/cloudstorage/tests/test_plugin.py new file mode 100644 index 0000000..7242783 --- /dev/null +++ b/ckanext/cloudstorage/tests/test_plugin.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- + +import pytest + +from libcloud.storage.types import ObjectDoesNotExistError + +import ckan.plugins as p + +from ckan.tests import helpers, factories + + +@pytest.mark.ckan_config('ckan.plugins', 'cloudstorage') +@pytest.mark.usefixtures('with_driver_options', 'with_plugins') +class TestCloudstoragePlugin(object): + + @pytest.mark.parametrize('option', ( + 'ckanext.cloudstorage.driver', + 'ckanext.cloudstorage.driver_options', + 'ckanext.cloudstorage.container_name')) + def test_required_config(self, ckan_config, monkeypatch, option): + """All those config options are essential and cloudstorage will + prevent application from start if any of them is missing. + + """ + monkeypatch.delitem(ckan_config, option) + plugin = p.get_plugin('cloudstorage') + with pytest.raises(RuntimeError, match='configuration option'): + plugin.configure(ckan_config) + + @pytest.mark.usefixtures('clean_db') + def test_before_delete(self, create_with_upload): + """When resource deleted, we must remove corresponding file from S3. + + """ + name = 'test.txt' + resource = create_with_upload('hello world', name, name=name, package_id=factories.Dataset()['id']) + plugin = p.get_plugin('cloudstorage') + uploader = plugin.get_resource_uploader(resource) + assert uploader.get_url_from_filename(resource['id'], name) + + helpers.call_action('resource_delete', id=resource['id']) + assert uploader.get_url_from_filename(resource['id'], name) is None + + @pytest.mark.usefixtures('clean_db') + def test_before_delete_for_linked_resource(self): + """Non-uploads don't raise exceptions. + """ + resource = factories.Resource() + helpers.call_action('resource_delete', id=resource['id']) + with pytest.raises(p.toolkit.ObjectNotFound): + helpers.call_action('resource_show', id=resource['id']) diff --git a/ckanext/cloudstorage/tests/test_storage.py b/ckanext/cloudstorage/tests/test_storage.py new file mode 100644 index 0000000..bfe47e1 --- /dev/null +++ b/ckanext/cloudstorage/tests/test_storage.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +import pytest + +from six.moves.urllib.parse import urlparse +from ckan.tests import factories + +from ckanext.cloudstorage.storage import CloudStorage, ResourceCloudStorage + + +@pytest.mark.ckan_config('ckan.plugins', 'cloudstorage') +@pytest.mark.usefixtures('with_driver_options', 'with_plugins') +class TestCloudStorage(object): + def test_props(self): + storage = CloudStorage() + assert storage.driver_options + assert storage.driver_name + assert storage.container_name + assert storage.container + assert not storage.leave_files + assert not storage.use_secure_urls + assert not storage.guess_mimetype + + +@pytest.mark.ckan_config('ckan.plugins', 'cloudstorage') +@pytest.mark.usefixtures('with_driver_options', 'with_plugins') +class TestResourceCloudStorage(object): + def test_not_secure_url_from_filename(self, create_with_upload): + filename = 'file.txt' + resource = create_with_upload('test', filename, package_id=factories.Dataset()['id']) + storage = ResourceCloudStorage(resource) + url = storage.get_url_from_filename(resource['id'], filename) + assert storage.container_name in url + assert not urlparse(url).query + + @pytest.mark.ckan_config('ckanext.cloudstorage.use_secure_urls', True) + def test_secure_url_from_filename(self, create_with_upload): + filename = 'file.txt' + resource = create_with_upload('test', filename, package_id=factories.Dataset()['id']) + storage = ResourceCloudStorage(resource) + if not storage.can_use_advanced_aws or not storage.use_secure_urls: + pytest.skip('SecureURL not supported') + url = storage.get_url_from_filename(resource['id'], filename) + assert urlparse(url).query + + @pytest.mark.ckan_config('ckanext.cloudstorage.use_secure_urls', True) + def test_hash_check(self, create_with_upload): + filename = 'file.txt' + resource = create_with_upload('test', filename, package_id=factories.Dataset()['id']) + storage = ResourceCloudStorage(resource) + if not storage.can_use_advanced_aws or not storage.use_secure_urls: + pytest.skip('SecureURL not supported') + url = storage.get_url_from_filename(resource['id'], filename) + resource = create_with_upload('test', filename, action='resource_update', id=resource['id']) + + assert urlparse(url).query diff --git a/ckanext/cloudstorage/tests/test_utils.py b/ckanext/cloudstorage/tests/test_utils.py new file mode 100644 index 0000000..b990b5d --- /dev/null +++ b/ckanext/cloudstorage/tests/test_utils.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +import pytest + +import ckan.plugins.toolkit as tk + +from ckan.tests import factories, helpers +from ckanext.cloudstorage import utils, storage + + +@pytest.mark.ckan_config('ckan.plugins', 'cloudstorage') +@pytest.mark.usefixtures('with_driver_options', 'with_plugins') +class TestResourceDownload(object): + def test_utils_used_by_download_route(self, app, mocker): + url = tk.url_for('resource.download', id='a', resource_id='b') + mocker.patch('ckanext.cloudstorage.utils.resource_download') + app.get(url) + utils.resource_doewnload.assert_called_once_with('a', 'b', None) + + @pytest.mark.usefixtures('clean_db') + def test_status_codes(self, app): + user = factories.User() + org = factories.Organization() + dataset = factories.Dataset(private=True, owner_org=org['id']) + resource = factories.Resource(package_id=dataset['id']) + + env = {'REMOTE_USER': user['name']} + url = tk.url_for( + 'resource.download', id='a', resource_id='b') + app.get(url, status=404, extra_environ=env) + + url = tk.url_for( + 'resource.download', id=dataset['id'], resource_id=resource['id']) + app.get(url, status=401, extra_environ=env) + + helpers.call_action('package_patch', id=dataset['id'], private=False) + app.get(url, status=302, extra_environ=env, follow_redirects=False) + + @pytest.mark.usefixtures('clean_db') + def test_download(self, create_with_upload, app): + filename = 'file.txt' + resource = create_with_upload('hello world', filename, package_id=factories.Dataset()['id']) + url = tk.url_for( + 'resource.download', + id=resource['package_id'], + resource_id=resource['id']) + resp = app.get(url, status=302, follow_redirects=False) + + uploader = storage.ResourceCloudStorage(resource) + expected_url = uploader.get_url_from_filename(resource['id'], + filename) + assert resp.headers['location'] == expected_url diff --git a/ckanext/cloudstorage/utils.py b/ckanext/cloudstorage/utils.py new file mode 100644 index 0000000..98e2379 --- /dev/null +++ b/ckanext/cloudstorage/utils.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- +from __future__ import print_function +import os.path + +import six +from ckan import logic, model +import ckan.plugins.toolkit as tk +from ckan.lib import base, uploader +import ckan.lib.helpers as h +import cgi +import tempfile +from ckan.logic import NotFound +from ckanapi import LocalCKAN + +from ckanext.cloudstorage.model import (create_tables, drop_tables) +from ckanext.cloudstorage.storage import (CloudStorage, ResourceCloudStorage) + + +if tk.check_ckan_version("2.9"): + from werkzeug.datastructures import FileStorage as FakeFileStorage +else: + class FakeFileStorage(cgi.FieldStorage): + def __init__(self, fp, filename): + self.file = fp + self.stream = fp + self.filename = filename + + +def initdb(): + drop_tables() + create_tables() + + +def fix_cors(domains): + cs = CloudStorage() + + if cs.can_use_advanced_azure: + from azure.storage import blob as azure_blob + from azure.storage import CorsRule + + blob_service = azure_blob.BlockBlobService(cs.driver_options['key'], + cs.driver_options['secret']) + + blob_service.set_blob_service_properties( + cors=[CorsRule(allowed_origins=domains, allowed_methods=['GET'])]) + return 'Done!', True + else: + return ('The driver {driver_name} being used does not currently' + ' support updating CORS rules through' + ' cloudstorage.'.format(driver_name=cs.driver_name)), False + + +def migrate(path, single_id): + if not os.path.isdir(path): + print('The storage directory cannot be found.') + return + + lc = LocalCKAN() + resources = {} + failed = [] + + # The resource folder is stuctured like so on disk: + # - storage/ + # - ... + # - resources/ + # - <3 letter prefix> + # - <3 letter prefix> + # - + # ... + # ... + # ... + for root, dirs, files in os.walk(path): + # Only the bottom level of the tree actually contains any files. We + # don't care at all about the overall structure. + if not files: + continue + + split_root = root.split('/') + resource_id = split_root[-2] + split_root[-1] + + for file_ in files: + ckan_res_id = resource_id + file_ + if single_id and ckan_res_id != single_id: + continue + + resources[ckan_res_id] = os.path.join(root, file_) + + for i, resource in enumerate(iter(list(resources.items())), 1): + resource_id, file_path = resource + print('[{i}/{count}] Working on {id}'.format(i=i, + count=len(resources), + id=resource_id)) + try: + resource = lc.action.resource_show(id=resource_id) + except NotFound: + print(u'\tResource not found') + continue + if resource['url_type'] != 'upload': + print(u'\t`url_type` is not `upload`. Skip') + continue + + with open(file_path, 'rb') as fin: + resource['upload'] = FakeFileStorage( + fin, resource['url'].split('/')[-1]) + try: + uploader = ResourceCloudStorage(resource) + uploader.upload(resource['id']) + except Exception as e: + failed.append(resource_id) + print(u'\tError of type {0} during upload: {1}'.format( + type(e), e)) + + if failed: + log_file = tempfile.NamedTemporaryFile(delete=False) + log_file.file.writelines([six.ensure_binary(l) for l in failed]) + print(u'ID of all failed uploads are saved to `{0}`: {1}'.format( + log_file.name, failed)) + + +def resource_download(id, resource_id, filename=None): + context = { + 'model': model, + 'session': model.Session, + 'user': tk.c.user or tk.c.author, + 'auth_user_obj': tk.c.userobj + } + + try: + resource = logic.get_action('resource_show')(context, { + 'id': resource_id + }) + except logic.NotFound: + return base.abort(404, tk._('Resource not found')) + except logic.NotAuthorized: + return base.abort(401, + tk._('Unauthorized to read resource {0}'.format(id))) + + # This isn't a file upload, so either redirect to the source + # (if available) or error out. + if resource.get('url_type') != 'upload': + url = resource.get('url') + if not url: + return base.abort(404, tk._('No download is available')) + return h.redirect_to(url) + + if filename is None: + # No filename was provided so we'll try to get one from the url. + filename = os.path.basename(resource['url']) + + upload = uploader.get_resource_uploader(resource) + + # if the client requests with a Content-Type header (e.g. Text preview) + # we have to add the header to the signature + try: + content_type = getattr(tk.request, "content_type", None) + except AttributeError: + content_type = None + uploaded_url = upload.get_url_from_filename(resource['id'], + filename, + content_type=content_type) + + # The uploaded file is missing for some reason, such as the + # provider being down. + if uploaded_url is None: + return base.abort(404, tk._('No download is available')) + + return h.redirect_to(uploaded_url) diff --git a/ckanext/cloudstorage/views.py b/ckanext/cloudstorage/views.py new file mode 100644 index 0000000..2f18a80 --- /dev/null +++ b/ckanext/cloudstorage/views.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- + +from flask import Blueprint +import ckanext.cloudstorage.utils as utils + +cloudstorage = Blueprint('cloudstorage', __name__) + + +@cloudstorage.route('/dataset//resource//download') +@cloudstorage.route('/dataset//resource//download/') +def download(id, resource_id, filename=None, package_type='dataset'): + return utils.resource_download(id, resource_id, filename) + + +def get_blueprints(): + return [cloudstorage] diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 0000000..846e10f --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,2 @@ +pytest-ckan +pytest-mock==3.6.1 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..7e01bee --- /dev/null +++ b/setup.cfg @@ -0,0 +1,7 @@ +[tool:pytest] + +filterwarnings = + ignore::sqlalchemy.exc.SADeprecationWarning + ignore::sqlalchemy.exc.SAWarning + ignore::DeprecationWarning +addopts = --ckan-ini test.ini \ No newline at end of file diff --git a/setup.py b/setup.py index 77f72af..8716d3a 100644 --- a/setup.py +++ b/setup.py @@ -17,8 +17,9 @@ include_package_data=True, zip_safe=False, install_requires=[ - 'apache-libcloud==1.5', - 'ckanapi>=1.0,<5' + 'apache-libcloud~=2.8.2', + 'six>=1.12.0', + 'ckanapi', ], entry_points=( """ @@ -26,7 +27,7 @@ cloudstorage=ckanext.cloudstorage.plugin:CloudStoragePlugin [paste.paster_command] - cloudstorage=ckanext.cloudstorage.cli:PasterCommand + cloudstorage=ckanext.cloudstorage.commands:PasterCommand """ ), ) diff --git a/test.ini b/test.ini new file mode 100644 index 0000000..12ace2e --- /dev/null +++ b/test.ini @@ -0,0 +1,54 @@ +[DEFAULT] +debug = false +smtp_server = localhost +error_email_from = paste@localhost + +[server:main] +use = egg:Paste#http +host = 0.0.0.0 +port = 5000 + +[app:main] +use = config:../ckan/test-core.ini + +# Insert any custom config settings to be used when running your extension's +# tests here. + +# Logging configuration +[loggers] +keys = root, ckan, sqlalchemy, ckanext + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console + +[logger_ckan] +qualname = ckan +handlers = +level = INFO + +[logger_sqlalchemy] +handlers = +qualname = sqlalchemy.engine +level = WARN + +[handler_console] +class = StreamHandler +args = (sys.stdout,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s + +[logger_ckanext] +level = DEBUG +handlers = console +qualname = ckanext +propagate = 0