Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5cd8e51
Add prototype
jterapin Aug 8, 2025
fa34176
Add transfer manager interface
jterapin Aug 11, 2025
1a065f7
Refactor MultipartStreamUploader
jterapin Aug 11, 2025
60ce676
Add abort upload when complete upload fails in MultipartStreamUploader
jterapin Aug 11, 2025
5cef078
Add TM docs
jterapin Aug 11, 2025
9fb8f89
Update docs and syntax
jterapin Aug 12, 2025
fe2f7f7
Add temp changelog entry
jterapin Aug 12, 2025
9b6b5f4
Omit spacing
jterapin Aug 12, 2025
7b37545
Add empty test file
jterapin Aug 12, 2025
6901b45
Add deprecation warnings
jterapin Aug 12, 2025
689d987
Update changelog
jterapin Aug 12, 2025
f2e3a16
Revised changelog entries wording
jterapin Aug 12, 2025
70fba15
Update consts
jterapin Aug 13, 2025
c945aad
Separate specs between utilities and resource-model
jterapin Aug 13, 2025
f847baa
Update specs for file downloader
jterapin Aug 13, 2025
dfde65f
Streamline specs
jterapin Aug 14, 2025
e48f418
Minor clean ups
jterapin Aug 14, 2025
4784fd9
Update user agent tracking
jterapin Aug 14, 2025
0645518
Merge branch 'version-3' into tm-interface
jterapin Aug 15, 2025
58d1185
Feedback - reuse existing deprecation message
jterapin Aug 15, 2025
7df69d4
Feedback - remove unnecessary api private tags
jterapin Aug 15, 2025
08622f1
Feedback - improve ordered parts in stream upload
jterapin Aug 15, 2025
9846340
Revert "Update user agent tracking"
jterapin Aug 15, 2025
58130ee
Remove comments
jterapin Aug 15, 2025
78cacb9
Fix syntax
jterapin Aug 15, 2025
ba25fb4
Merge branch 'version-3' into tm-interface
jterapin Aug 18, 2025
c47a6ef
Feedback - update deprecated use method
jterapin Aug 18, 2025
36c428a
Feedback - remove unnecessary specs
jterapin Aug 18, 2025
3ed82b2
Merge from tm-interface branch
jterapin Aug 18, 2025
05eacf2
Testing implementation
jterapin Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gems/aws-sdk-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Unreleased Changes
------------------

* Issue - When multipart stream uploader fails to complete multipart upload, it calls abort multipart upload.

* Issue - For `Aws::S3::Object` class, the following methods have been deprecated: `download_file`, `upload_file` and `upload_stream`. Use `Aws::S3::TransferManager` instead.

* Feature - Add `Aws::S3::TransferManager`, a S3 transfer utility that provides upload/download capabilities with automatic multipart handling, progress tracking, and handling of large files.

1.196.1 (2025-08-05)
------------------

Expand Down
6 changes: 5 additions & 1 deletion gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ module S3
autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier'
autoload :PresignedPost, 'aws-sdk-s3/presigned_post'
autoload :Presigner, 'aws-sdk-s3/presigner'
autoload :TransferManager, 'aws-sdk-s3/transfer_manager'

# s3 express session auth
autoload :ExpressCredentials, 'aws-sdk-s3/express_credentials'
autoload :ExpressCredentialsProvider, 'aws-sdk-s3/express_credentials_provider'

# s3 access grants auth

autoload :AccessGrantsCredentials, 'aws-sdk-s3/access_grants_credentials'
autoload :AccessGrantsCredentialsProvider, 'aws-sdk-s3/access_grants_credentials_provider'

# testing transfer manager
autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader'
autoload :TransferManager, 'aws-sdk-s3/transfer_manager'
end
end
3 changes: 3 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def upload_stream(options = {}, &block)
end
true
end
deprecated(:upload_stream, use: 'Aws::S3::TransferManager#upload_stream', version: 'next major version')

# Uploads a file from disk to the current object in S3.
#
Expand Down Expand Up @@ -465,6 +466,7 @@ def upload_file(source, options = {})
yield response if block_given?
true
end
deprecated(:upload_file, use: 'Aws::S3::TransferManager#upload_file', version: 'next major version')

# Downloads a file in S3 to a path on disk.
#
Expand Down Expand Up @@ -534,6 +536,7 @@ def download_file(destination, options = {})
end
true
end
deprecated(:download_file, use: 'Aws::S3::TransferManager#download_file', version: 'next major version')

class Collection < Aws::Resources::Collection
alias_method :delete, :batch_delete!
Expand Down
145 changes: 145 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# frozen_string_literal: true

require 'find'
require 'set'
require 'thread'

module Aws
module S3
# @api private
class DirectoryUploader
def initialize(options = {})
@client = options[:client] || Client.new
@thread_count = options[:thread_count] || 10
@executor = options[:executor]
end

# @return [Client]
attr_reader :client

def upload(source, options = {})
raise ArgumentError, 'Invalid directory' unless Dir.exist?(source)

upload_opts = options.dup
@source = source
@recursive = upload_opts.delete(:recursive) || false
@follow_symlinks = upload_opts.delete(:follow_symlinks) || false
@s3_prefix = upload_opts.delete(:s3_prefix) || nil
@s3_delimiter = upload_opts.delete(:s3_delimiter) || '/'
@filter_callback = upload_opts.delete(:filter_callback) || nil

uploader = FileUploader.new(
multipart_threshold: upload_opts.delete(:multipart_threshold),
client: @client,
executor: @executor
)
@file_queue = SizedQueue.new(5) # TODO: random number for now, intended to relive backpressure
@disable_queue = false

_producer = Thread.new do
if @recursive
stream_recursive_files
else
stream_direct_files
end

# signals queue being done
if @executor
@file_queue << :done
else
@thread_count.times { @file_queue << :done }
end
end

if @executor
upload_with_executor(uploader, upload_opts)
else
threads = []
@thread_count.times do
thread = Thread.new do
return if @disable_queue

while (file = @file_queue.shift) != :done
path = File.join(@source, file)
# TODO: key to consider s3_prefix and custom delimiter
uploader.upload(path, upload_opts.merge(key: file))
end
nil
rescue StandardError => e # TODO: handle failure policies
@disable_queue = true
e
end
threads << thread
end
threads.map(&:value).compact
end
end

private

def upload_with_executor(uploader, upload_opts)
total_files = 0
completion_queue = Queue.new
errors = []
while (file = @file_queue.shift) != :done
total_files += 1
@executor.post(file) do |f|
begin
next if @disable_queue

path = File.join(@source, f)
# TODO: key to consider s3_prefix and custom delimiter
uploader.upload(path, upload_opts.merge(key: f))
rescue StandardError => e # TODO: handle failure policies
@disable_queue = true
errors << e
end
ensure
completion_queue << :done
end
end
puts 'waiting for completion'
total_files.times { completion_queue.pop }
puts 'all done waiting!'
raise StandardError, 'directory upload failed' unless errors.empty?
end

def stream_recursive_files
visited = Set.new
# TODO: add filter callback
Find.find(@source) do |p|
break if @disable_queue

if !@follow_symlinks && File.symlink?(p)
Find.prune
next
end

absolute_path = File.realpath(p)
if visited.include?(absolute_path)
Find.prune
next
end

visited << absolute_path

# TODO: if non-default s3_delimiter is used, validate here and fail
@file_queue << p.sub(%r{^#{Regexp.escape(@source)}/}, '') if File.file?(p)
end
end

def stream_direct_files
# TODO: add filter callback4
Dir.each_child(@source) do |entry|
break if @disable_queue

path = File.join(@source, entry)
next if !@follow_symlinks && File.symlink?(path)

# TODO: if non-default s3_delimiter is used, validate here and fail
@file_queue << entry if File.file?(path)
end
end
end
end
end
8 changes: 3 additions & 5 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@ module S3
# @api private
class FileUploader

ONE_HUNDRED_MEGABYTES = 100 * 1024 * 1024
DEFAULT_MULTIPART_THRESHOLD = 100 * 1024 * 1024

# @param [Hash] options
# @option options [Client] :client
# @option options [Integer] :multipart_threshold (104857600)
def initialize(options = {})
@options = options
@client = options[:client] || Client.new
@multipart_threshold = options[:multipart_threshold] ||
ONE_HUNDRED_MEGABYTES
@multipart_threshold = options[:multipart_threshold] || DEFAULT_MULTIPART_THRESHOLD
end

# @return [Client]
attr_reader :client

# @return [Integer] Files larger than or equal to this in bytes are uploaded
# using a {MultipartFileUploader}.
# @return [Integer] Files larger than or equal to this in bytes are uploaded using a {MultipartFileUploader}.
attr_reader :multipart_threshold

# @param [String, Pathname, File, Tempfile] source The file to upload.
Expand Down
81 changes: 71 additions & 10 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,23 @@ module S3
class MultipartFileUploader

MIN_PART_SIZE = 5 * 1024 * 1024 # 5MB

MAX_PARTS = 10_000

THREAD_COUNT = 10

DEFAULT_THREAD_COUNT = 10
CREATE_OPTIONS = Set.new(Client.api.operation(:create_multipart_upload).input.shape.member_names)

COMPLETE_OPTIONS = Set.new(Client.api.operation(:complete_multipart_upload).input.shape.member_names)

UPLOAD_PART_OPTIONS = Set.new(Client.api.operation(:upload_part).input.shape.member_names)

CHECKSUM_KEYS = Set.new(
Client.api.operation(:upload_part).input.shape.members.map do |n, s|
n if s.location == 'header' && s.location_name.start_with?('x-amz-checksum-')
end.compact
)

# @option options [Client] :client
# @option options [Integer] :thread_count (THREAD_COUNT)
# @option options [Integer] :thread_count (DEFAULT_THREAD_COUNT)
def initialize(options = {})
@client = options[:client] || Client.new
@thread_count = options[:thread_count] || THREAD_COUNT
@executor = options[:executor]
@thread_count = options[:thread_count] || DEFAULT_THREAD_COUNT
end

# @return [Client]
Expand Down Expand Up @@ -72,7 +67,13 @@ def complete_upload(upload_id, parts, source, options)
def upload_parts(upload_id, source, options)
completed = PartList.new
pending = PartList.new(compute_parts(upload_id, source, options))
errors = upload_in_threads(pending, completed, options)
errors =
if @executor
puts "Executor route - using #{@executor}"
upload_in_executor(pending, completed, options)
else
upload_in_threads(pending, completed, options)
end
if errors.empty?
completed.to_a.sort_by { |part| part[:part_number] }
else
Expand Down Expand Up @@ -141,6 +142,62 @@ def upload_part_opts(options)
end
end

def upload_in_executor(pending, completed, options)
if (callback = options[:progress_callback])
progress = MultipartProgress.new(pending, callback)
end
max_parts = pending.count
completion_queue = Queue.new
stop_work = false
errors = []
counter = 0

puts "Submitting #{max_parts} tasks"
while (part = pending.shift)
counter += 1
puts "Submitting #{counter} task to executor"
@executor.post(part) do |p|
if stop_work
puts 'Work stopped so skipping'
completion_queue << :done
next
end

if progress
p[:on_chunk_sent] =
proc do |_chunk, bytes, _total|
progress.call(p[:part_number], bytes)
end
end

begin
puts "Uploading #{p[:part_number]}"

resp = @client.upload_part(p)
p[:body].close
completed_part = { etag: resp.etag, part_number: p[:part_number] }
algorithm = resp.context.params[:checksum_algorithm]
k = "checksum_#{algorithm.downcase}".to_sym
completed_part[k] = resp.send(k)
completed.push(completed_part)
rescue StandardError => e
puts "Encountered Error #{e}"
stop_work = true
errors << e
ensure
puts 'Adding to completion queue'
completion_queue << :done
end
nil
end
end

puts "Waiting for #{counter} completion"
max_parts.times { completion_queue.pop }
puts "Done Waiting. Result: \n Completed:#{completed} \n Error: #{errors}"
errors
end

def upload_in_threads(pending, completed, options)
threads = []
if (callback = options[:progress_callback])
Expand Down Expand Up @@ -195,6 +252,10 @@ def initialize(parts = [])
@mutex = Mutex.new
end

def count
@mutex.synchronize { @parts.count }
end

def push(part)
@mutex.synchronize { @parts.push(part) }
end
Expand Down
Loading
Loading