Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ mod tests {
use crate::integration::*;
use crate::tests::*;
use crate::ClientOptions;
use crate::ObjectStoreExt;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use http::HeaderMap;
Expand Down
1 change: 1 addition & 0 deletions src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ mod tests {
use super::*;
use crate::integration::*;
use crate::tests::*;
use crate::ObjectStoreExt;
use bytes::Bytes;

#[tokio::test]
Expand Down
8 changes: 4 additions & 4 deletions src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ impl AsyncBufRead for BufReader {

/// An async buffered writer compatible with the tokio IO traits
///
/// This writer adaptively uses [`ObjectStore::put`] or
/// This writer adaptively uses [`ObjectStore::put_opts`] or
/// [`ObjectStore::put_multipart`] depending on the amount of data that has
/// been written.
///
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will instead be
/// streamed using [`ObjectStore::put_multipart`]
pub struct BufWriter {
capacity: usize,
Expand All @@ -242,7 +242,7 @@ enum BufWriterState {
Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
/// Write to a multipart upload
Write(Option<WriteMultipart>),
/// [`ObjectStore::put`]
/// [`ObjectStore::put_opts`]
Flush(BoxFuture<'static, crate::Result<()>>),
}

Expand Down Expand Up @@ -489,7 +489,7 @@ mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
use crate::{Attribute, GetOptions};
use crate::{Attribute, GetOptions, ObjectStoreExt};
use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

Expand Down
1 change: 1 addition & 0 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ mod tests {
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::path::Path;
use crate::ObjectStoreExt;

use super::*;

Expand Down
1 change: 1 addition & 0 deletions src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ mod test {

use crate::integration::*;
use crate::tests::*;
use crate::ObjectStoreExt;

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::multipart::MultipartStore;
use crate::path::Path;
use crate::{
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload,
ObjectStore, PutMode, PutPayload, UpdateVersion, WriteMultipart,
ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart,
};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
Expand Down
67 changes: 50 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@
//!
//! # Put Object
//!
//! Use the [`ObjectStore::put`] method to atomically write data.
//! Use the [`ObjectStoreExt::put`] method to atomically write data.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::{ObjectStore, PutPayload};
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
//! # use std::sync::Arc;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
Expand Down Expand Up @@ -338,7 +338,7 @@
//!
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::{ObjectStore, PutPayloadMut};
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -587,19 +587,22 @@ pub type DynObjectStore = dyn ObjectStore;
pub type MultipartId = String;

/// Universal API to multiple object store services.
///
/// For more convience methods, check [`ObjectStoreExt`].
///
/// # Contract
/// This trait is meant as a contract between object store implementations
/// (e.g. providers, wrappers) and the `object_store` crate itself.
Comment on lines +594 to +595
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is another key goal that ObjectStore is meant to be the minimal required API? If so perhaps we can add something like

Suggested change
/// This trait is meant as a contract between object store implementations
/// (e.g. providers, wrappers) and the `object_store` crate itself.
/// This trait is meant as a contract between object store implementations
/// (e.g. providers, wrappers) and the `object_store` crate itself and is
/// intended to be the minimum API required for an object store.

///
/// The [`ObjectStoreExt`] acts as an API/contract between `object_store`
/// and the store users.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// and the store users.
/// and the store users and provides additional methods that may be simpler to use but overlap
/// in functionality with `ObjectStore`

#[async_trait]
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Save the provided bytes to the specified location
/// Save the provided `payload` to `location` with the given options
///
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `payload` to `location`, or fail. No clients
/// should be able to observe a partially written object
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.put_opts(location, payload, PutOptions::default())
.await
}

/// Save the provided `payload` to `location` with the given options
async fn put_opts(
&self,
location: &Path,
Expand All @@ -609,7 +612,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Perform a multipart upload
///
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
/// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
///
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
Expand All @@ -620,7 +623,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Perform a multipart upload with options
///
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
/// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
///
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
Expand Down Expand Up @@ -696,7 +699,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # let root = tempfile::TempDir::new().unwrap();
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
/// # use object_store::{ObjectStore, ObjectMeta};
/// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
/// # use object_store::path::Path;
/// # use futures::{StreamExt, TryStreamExt};
/// #
Expand Down Expand Up @@ -803,10 +806,6 @@ macro_rules! as_ref_impl {
($type:ty) => {
#[async_trait]
impl ObjectStore for $type {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.as_ref().put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -901,6 +900,40 @@ macro_rules! as_ref_impl {
as_ref_impl!(Arc<dyn ObjectStore>);
as_ref_impl!(Box<dyn ObjectStore>);

/// Helper module to [seal traits](https://predr.ag/blog/definitive-guide-to-sealed-traits-in-rust/).
mod private {
pub trait Sealed {}

impl<T> Sealed for T where T: super::ObjectStore + ?Sized {}
}

/// Extension trait for [`ObjectStore`] with convinience functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ease the transition I suggest we augment the documentation here with the following additional information

1. Explain the intended use / goal of this trait

As I understand it, the idea of this trait is to make it more clear what functions must be provided by an ObjectStore implementation and which are mostly implemented in terms of the others.

Maybe we can also make it clear to implementers that the default implementations of ObjectStoreExt may not be optimal for their uscase

However, I may not understand the full subtely - perhaps we can add more info from #385 as appropriate

2. Add a note about the migration plan / tips to help on upgrade

For example, we could add a section on "upgrade notes" and explain "if you implemented put for your ObjectStore, if it has an implementation different than the default move that implementation to ObjectStoreExt.

Also it would be good to highlight any planned changes that were forthcoming (like "we plan to move the X, Y and Z functions to ObjectStoreExt over time as well")

3. An example

I know this sounds silly, but I think especially given impl Future --> ... signature it is non obvious to the causal Rust programmer that this means the impl should have an async fn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a motivation text to both traits and also changed the implementation to async_trait. Does this look more useful to you?

///
/// See "contract" section within the [`ObjectStore`] documentation for more reasoning.
///
/// # Implementation
/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations.
#[async_trait]
pub trait ObjectStoreExt: private::Sealed {
/// Save the provided bytes to the specified location
///
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `payload` to `location`, or fail. No clients
/// should be able to observe a partially written object
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult>;
}

#[async_trait]
impl<T> ObjectStoreExt for T
where
T: ObjectStore + private::Sealed + ?Sized,
{
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.put_opts(location, payload, PutOptions::default())
.await
}
}

/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
Expand Down
5 changes: 0 additions & 5 deletions src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {

#[async_trait]
impl<T: ObjectStore> ObjectStore for LimitStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down
2 changes: 1 addition & 1 deletion src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ mod tests {
#[cfg(target_family = "unix")]
use tempfile::NamedTempFile;

use crate::integration::*;
use crate::{integration::*, ObjectStoreExt};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl MultipartUpload for InMemoryUpload {

#[cfg(test)]
mod tests {
use crate::integration::*;
use crate::{integration::*, ObjectStoreExt};

use super::*;

Expand Down
7 changes: 1 addition & 6 deletions src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {
}
#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
let full_path = self.full_path(location);
self.inner.put(&full_path, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -225,8 +220,8 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::integration::*;
use crate::local::LocalFileSystem;
use crate::{integration::*, ObjectStoreExt};

use tempfile::TempDir;

Expand Down
8 changes: 2 additions & 6 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct ThrottleConfig {
/// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
pub wait_list_with_delimiter_per_entry: Duration,

/// Sleep duration for every call to [`put`](ThrottledStore::put).
/// Sleep duration for every call to [`put_opts`](ThrottledStore::put_opts).
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
Expand Down Expand Up @@ -148,11 +148,6 @@ impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {

#[async_trait]
impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
sleep(self.config().wait_put_per_call).await;
self.inner.put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -404,6 +399,7 @@ impl MultipartUpload for ThrottledUpload {
#[cfg(test)]
mod tests {
use super::*;
use crate::ObjectStoreExt;
use crate::{integration::*, memory::InMemory, GetResultPayload};
use futures::TryStreamExt;
use tokio::time::Duration;
Expand Down