diff --git a/CMakeLists.txt b/CMakeLists.txt index b05a9e3..f0017c8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,7 @@ include(cmake/third_party_absl.cmake) include(cmake/third_party_pybind11.cmake) include(cmake/third_party_zstd.cmake) include(cmake/third_party_gcs.cmake) +include(cmake/third_party_aws.cmake) # Restore original CXX_FLAGS set(CMAKE_CXX_FLAGS "${SAVED_CMAKE_CXX_FLAGS}") @@ -107,6 +108,27 @@ bagz_cc_library( google-cloud-cpp::storage ) +file(GLOB bagz_file_system_s3_sources "src/file/file_systems/s3/*.cc") +file(GLOB bagz_file_system_s3_headers "src/file/file_systems/s3/*.h") + +bagz_cc_library( + bagz_file_system_s3 + SOURCES ${bagz_file_system_s3_sources} + HEADERS ${bagz_file_system_s3_headers} + DEPS + absl::log + absl::status + absl::statusor + absl::strings + bagz_file_system + bagz_internal + aws-cpp-sdk-s3 + aws-cpp-sdk-core + GTest::gtest + GTest::gtest_main + GTest::gmock +) + file(GLOB bagz_file_sources "src/file/*.cc" "src/file/registry/*.cc") file(GLOB bagz_file_headers "src/file/*.h" "src/file/registry/*.h") @@ -123,7 +145,10 @@ bagz_cc_library( bagz_file_system bagz_file_system_posix bagz_file_system_gcs + bagz_file_system_s3 google-cloud-cpp::storage + aws-cpp-sdk-s3 + aws-cpp-sdk-core ) file(GLOB bagz_core_sources "src/*.cc") diff --git a/cmake/third_party_aws.cmake b/cmake/third_party_aws.cmake new file mode 100644 index 0000000..1c1e739 --- /dev/null +++ b/cmake/third_party_aws.cmake @@ -0,0 +1,45 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Disable testing. +set(BUILD_TESTING OFF) +set(ENABLE_TESTING OFF) + +# Only build S3 client +set(BUILD_ONLY "s3" CACHE STRING "") + +# Disable shared libraries +set(BUILD_SHARED_LIBS OFF CACHE BOOL "") + +FetchContent_Declare( + aws-sdk-cpp + GIT_REPOSITORY https://github.com/aws/aws-sdk-cpp.git + GIT_TAG 1.11.400 # Latest stable version + GIT_SHALLOW TRUE + EXCLUDE_FROM_ALL +) + +FetchContent_MakeAvailable(aws-sdk-cpp) + +FetchContent_Declare( + googletest + GIT_REPOSITORY https://github.com/google/googletest.git + GIT_TAG v1.17.0 +) + +set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + +if (NOT TARGET GTest::gtest) + FetchContent_MakeAvailable(googletest) +endif() \ No newline at end of file diff --git a/src/file/file_systems/s3/s3_file_system.cc b/src/file/file_systems/s3/s3_file_system.cc new file mode 100644 index 0000000..d16c62b --- /dev/null +++ b/src/file/file_systems/s3/s3_file_system.cc @@ -0,0 +1,387 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/file/file_systems/s3/s3_file_system.h" + +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "absl/base/no_destructor.h" +#include "absl/base/nullability.h" +#include "absl/functional/function_ref.h" +#include "absl/log/check.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_split.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" +#include "src/file/file_system/pread_file.h" +#include "src/file/file_system/shard_spec.h" +#include "src/file/file_system/write_file.h" +#include "src/internal/parallel_do.h" +#include +#include +#include +#include +#include +#include +#include + +namespace bagz { +namespace { + +constexpr int kMaxParallelism = 100; + +struct BucketObject { + const std::string bucket; + const std::string object; +}; + +absl::Status ConvertAwsError(const Aws::Client::AWSError& error) { + // Map AWS error types to absl status codes + switch (error.GetErrorType()) { + case Aws::S3::S3Errors::NO_SUCH_KEY: + case Aws::S3::S3Errors::NO_SUCH_BUCKET: + return absl::NotFoundError(error.GetMessage()); + case Aws::S3::S3Errors::ACCESS_DENIED: + return absl::PermissionDeniedError(error.GetMessage()); + case Aws::S3::S3Errors::INVALID_PARAMETER_VALUE: + return absl::InvalidArgumentError(error.GetMessage()); + default: + return absl::UnknownError(error.GetMessage()); + } +} + +// A reference to an S3 object. This reference allows random access to the +// content of the object. Note that changes to the file made in-between reads +// are not checked, which can cause race condition. Only out of range reads are +// checked, which means that if we add data at the end of the file, the reader +// won't be able to access it. +class S3PReadFile : public PReadFile { + public: + explicit S3PReadFile(std::shared_ptr client, + absl::string_view bucket_name, + absl::string_view object_name, size_t size) + : client_(std::move(client)), + bucket_name_(bucket_name), + object_name_(object_name), + size_(size) { + CHECK(client_ != nullptr); + } + size_t size() const override { return size_; } + + absl::Status PRead( + size_t offset, size_t num_bytes, + absl::FunctionRef callback) const override { + if (num_bytes > size()) { + return absl::OutOfRangeError( + absl::StrCat("Invalid range: size > file size (here: ", num_bytes, + " > ", size(), ")")); + } + if (offset > size() - num_bytes) { + return absl::OutOfRangeError(absl::StrCat( + "Invalid range: offset > file size - size (here: ", offset, " > ", + size() - num_bytes, ")")); + } + + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(bucket_name_); + request.SetKey(object_name_); + + // Set the range header for partial read + std::string range = absl::StrCat("bytes=", offset, "-", offset + num_bytes - 1); + request.SetRange(range); + + auto outcome = client_->GetObject(request); + + if (!outcome.IsSuccess()) { + return ConvertAwsError(outcome.GetError()); + } + + auto& stream = outcome.GetResult().GetBody(); + std::string contents(std::istreambuf_iterator(stream), {}); + + callback(contents); + return absl::OkStatus(); + } + + const std::string& Name() const { return object_name_; } + + private: + std::shared_ptr const client_; + const std::string bucket_name_; + const std::string object_name_; + const size_t size_; +}; + +class S3WriteFile : public WriteFile { + public: + explicit S3WriteFile(std::shared_ptr client, + const std::string& bucket, + const std::string& key, + uint64_t offset) + : client_(std::move(client)), + bucket_(bucket), + key_(key), + offset_(offset) {} + + // TODO(yolokfx): this is writing to memory first then flush to s3. + // Need to have a checker here. Please make sure each of the shard is small enough. + absl::Status Write(absl::string_view data) override { + buffer_.append(data.data(), data.size()); + return absl::OkStatus(); + } + + absl::Status Flush() override { + if (buffer_.empty()) { + return absl::OkStatus(); + } + + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + + auto stream = std::make_shared(buffer_); + request.SetBody(stream); + + auto outcome = client_->PutObject(request); + + if (!outcome.IsSuccess()) { + return ConvertAwsError(outcome.GetError()); + } + + buffer_.clear(); + return absl::OkStatus(); + } + + absl::Status Close() override { + if (!closed_) { + closed_ = true; + return Flush(); + } + return absl::OkStatus(); + } + + ~S3WriteFile() override { + if (!closed_) { + (void)Close(); + } + } + + private: + std::shared_ptr const client_; + const std::string bucket_; + const std::string key_; + const uint64_t offset_; + std::string buffer_; + bool closed_ = false; +}; + +// Get the bucket and object name from an S3 URI. Leading `/` are stripped, but +// the URI is expected not to contain the `s3:` prefix. This prefix is already +// stripped by the file registry. +BucketObject BucketAndObjectName(absl::string_view filename) { + // Remove leading slash. + while (absl::ConsumePrefix(&filename, "/")) { + } + + std::pair bucket_and_object_name = + absl::StrSplit(filename, absl::MaxSplits('/', 1)); + + return BucketObject{ + .bucket = std::string(bucket_and_object_name.first), + .object = std::string(bucket_and_object_name.second), + }; +} + +} // namespace + +std::shared_ptr S3FileSystem::Client() const { + ClientFactory client_factory = client_factory_; + if (client_factory == nullptr) { + client_factory = [] { + static Aws::SDKOptions options; + static bool initialized = false; + if (!initialized) { + Aws::InitAPI(options); + initialized = true; + } + return std::make_shared(); + }; + } + static absl::NoDestructor> client( + client_factory()); + return *client; +} + +absl::StatusOr> +S3FileSystem::OpenWrite(absl::string_view filename_without_prefix, + uint64_t offset, absl::string_view options) const { + const BucketObject bucket_object = + BucketAndObjectName(filename_without_prefix); + + // Note: S3 doesn't support writing at an offset directly like GCS. + // For simplicity, we'll start a new write. A full implementation would + // need to handle multipart uploads or read-modify-write for offset support. + if (offset != 0) { + return absl::UnimplementedError( + "S3 write with non-zero offset not yet implemented"); + } + + return std::make_unique(Client(), bucket_object.bucket, + bucket_object.object, offset); +} + +absl::StatusOr> +S3FileSystem::OpenPRead(absl::string_view filename_without_prefix, + absl::string_view options) const { + const BucketObject bucket_object = + BucketAndObjectName(filename_without_prefix); + std::shared_ptr client = Client(); + + Aws::S3::Model::HeadObjectRequest request; + request.SetBucket(bucket_object.bucket); + request.SetKey(bucket_object.object); + + auto outcome = client->HeadObject(request); + + if (!outcome.IsSuccess()) { + return ConvertAwsError(outcome.GetError()); + } + + size_t size = outcome.GetResult().GetContentLength(); + + return std::make_unique(client, bucket_object.bucket, + bucket_object.object, size); +} + +absl::Status S3FileSystem::Delete(absl::string_view filename_without_prefix, + absl::string_view options) const { + const BucketObject bucket_object = + BucketAndObjectName(filename_without_prefix); + + Aws::S3::Model::DeleteObjectRequest request; + request.SetBucket(bucket_object.bucket); + request.SetKey(bucket_object.object); + + auto outcome = Client()->DeleteObject(request); + + if (!outcome.IsSuccess()) { + return ConvertAwsError(outcome.GetError()); + } + + return absl::OkStatus(); +} + +absl::StatusOr>> +S3FileSystem::BulkOpenPRead(absl::string_view filespec_without_prefix, + absl::string_view options) const { + std::vector expanded_filespec = + ExpandShardSpec(filespec_without_prefix); + + std::vector>> files_per_shard_spec( + expanded_filespec.size()); + std::shared_ptr client = Client(); + + if (absl::Status status = internal::ParallelDo( + expanded_filespec.size(), + [&expanded_filespec, &files_per_shard_spec, + &client](size_t filespec_index) -> absl::Status { + absl::string_view filespec = expanded_filespec[filespec_index]; + std::string pattern; + std::string prefix; + + if (absl::string_view::size_type pos = filespec.rfind("@*"); + pos != absl::string_view::npos) { + absl::string_view prefix_view = filespec.substr(0, pos); + absl::string_view suffix = filespec.substr(pos + 2); + pattern = absl::StrCat( + prefix_view, + "-[0-9][0-9][0-9][0-9][0-9]-of-[0-9][0-9][0-9][0-9][0-9]", + suffix); + prefix = std::string(prefix_view); + } else { + pattern = filespec; + prefix = filespec; + } + + const BucketObject bucket_object = BucketAndObjectName(prefix); + + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(bucket_object.bucket); + request.SetPrefix(bucket_object.object); + + std::vector>& files = + files_per_shard_spec[filespec_index]; + std::vector file_names; + + bool done = false; + while (!done) { + auto outcome = client->ListObjectsV2(request); + + if (!outcome.IsSuccess()) { + return ConvertAwsError(outcome.GetError()); + } + + const auto& result = outcome.GetResult(); + for (const auto& object : result.GetContents()) { + const std::string& key = object.GetKey(); + file_names.emplace_back(key); + files.push_back(std::make_unique( + client, bucket_object.bucket, key, object.GetSize())); + } + + if (result.GetIsTruncated()) { + request.SetContinuationToken(result.GetNextContinuationToken()); + } else { + done = true; + } + } + + if (!absl::c_is_sorted(file_names)) { + return absl::InternalError( + "When opening the bagz in bulk, the file names are not " + "sorted."); + } + + return absl::OkStatus(); + }, + kMaxParallelism, /*cpu_bound=*/false); + !status.ok()) { + return status; + } + + size_t total_number_of_files = 0; + for (const auto& files : files_per_shard_spec) { + total_number_of_files += files.size(); + } + + std::vector> all_files; + all_files.reserve(total_number_of_files); + + for (auto& files : files_per_shard_spec) { + all_files.insert(all_files.end(), std::make_move_iterator(files.begin()), + std::make_move_iterator(files.end())); + } + + return all_files; +} + +} // namespace bagz \ No newline at end of file diff --git a/src/file/file_systems/s3/s3_file_system.h b/src/file/file_systems/s3/s3_file_system.h new file mode 100644 index 0000000..a85b86f --- /dev/null +++ b/src/file/file_systems/s3/s3_file_system.h @@ -0,0 +1,78 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef BAGZ_SRC_FILE_FILE_SYSTEMS_S3_S3_FILE_SYSTEM_H_ +#define BAGZ_SRC_FILE_FILE_SYSTEMS_S3_S3_FILE_SYSTEM_H_ + +#include +#include +#include +#include + +#include "absl/base/nullability.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" +#include "src/file/file_system/file_system.h" +#include "src/file/file_system/pread_file.h" +#include "src/file/file_system/write_file.h" +#include + +namespace bagz { + +// An implementation of FileSystem to read files from AWS S3. +class S3FileSystem : public FileSystem { + public: + using ClientFactory = std::shared_ptr (*)(); + explicit S3FileSystem(ClientFactory client_factory = nullptr) + : client_factory_(client_factory) {} + + protected: + // Open an S3 object for reading. + // `filename_without_prefix` should be the URI of the object on S3 without + // the leading `s3:`. + absl::StatusOr> OpenPRead( + absl::string_view filename_without_prefix, + absl::string_view options) const override; + + // Open an S3 object for writing, starting at a given offset. After opening + // the file, any data after that offset will be deleted. + // `filename_without_prefix` should be the URI of the object on S3 without + // the leading `s3:`. + absl::StatusOr> OpenWrite( + absl::string_view filename_without_prefix, uint64_t offset, + absl::string_view options) const override; + + // Delete an S3 object. + // `filename_without_prefix` should be the URI of the object on S3 without + // the leading `s3:`. + absl::Status Delete(absl::string_view filename_without_prefix, + absl::string_view options) const override; + + // Open a set of files for reading. See file_system/shard_spec.h for details + // on the filespec format. + // `filename_without_prefix` should be the URI of the object on S3 without + // the leading `s3:`. + absl::StatusOr>> + BulkOpenPRead(absl::string_view filespec_without_prefix, + absl::string_view options) const override; + + private: + std::shared_ptr Client() const; + ClientFactory client_factory_; +}; + +} // namespace bagz + +#endif // BAGZ_SRC_FILE_FILE_SYSTEMS_S3_S3_FILE_SYSTEM_H_ \ No newline at end of file diff --git a/src/file/registry/register_file_systems.cc b/src/file/registry/register_file_systems.cc index 38ae8b1..99c0cea 100644 --- a/src/file/registry/register_file_systems.cc +++ b/src/file/registry/register_file_systems.cc @@ -16,6 +16,7 @@ #include "absl/log/absl_check.h" #include "src/file/file_systems/gcs/gcs_file_system.h" #include "src/file/file_systems/posix/posix_file_system.h" +#include "src/file/file_systems/s3/s3_file_system.h" #include "src/file/registry/file_system_registry.h" namespace bagz { @@ -23,9 +24,11 @@ namespace bagz { void RegisterFileSystems(FileSystemRegistry& register_fs) { static absl::NoDestructor posix_fs; static absl::NoDestructor gcs_fs; + static absl::NoDestructor s3_fs; ABSL_CHECK_OK(register_fs.Register("gs:", *gcs_fs)); ABSL_CHECK_OK(register_fs.Register("posix:", *posix_fs)); + ABSL_CHECK_OK(register_fs.Register("s3:", *s3_fs)); // Set the default file system. ABSL_CHECK_OK(register_fs.Register("", *posix_fs));