-
Notifications
You must be signed in to change notification settings - Fork 107
Faster file-like writes #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
adlfs/spec.py
Outdated
self._block_list = block_ids | ||
for chunk, block_id in zip(chunks, block_ids): | ||
|
||
async def _upload_chunk(chunk=chunk, block_id=block_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dynamically defining this function to use local variables from the outer scope seems brittle to me. Can it not be a normal (async) function in the module that gets called with all the required arguments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. We should hoist this to a private method on the class or function on the module.
@martindurant as a heads up, this PR is currently more in a draft/WIP state and is published to start showing the direction we had in mind. There's still some performance testing we want to do and share to validate the motivation behind this change.
I also think we will have to reconcile with this issue as well: #494 where the block size is always 1 GiB, which in addition to causing connection issues, it will work against any possible concurrency. So, we'll likely be exploring updating AzureBlobFile
to actually respect block_size
as part of uploads, which also should align the default more with the sizes used in s3fs of 50 MiB.
We'll let you know when the PR is ready for more thorough review though but happy to hear any feedback that you have on the direction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks for letting me know.
the sizes used in s3fs of 50 MiB
The old default was 5MB, and you'll still see this in some places in fsspec. A good choice for the previous generation of small bandwidth, poor for today, especially if the compute is colocated with the storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. 50 MiB is also a better default given number of blocks limit; right now you can only have 50,000 blocks for an Azure blob. So, if each block is 5 MB, you'd only be able to upload by default roughly a 200 GiB blob, which is not necessarily uncommon. Bumping it to 50 MiB, gives a more reasonable default ceiling of around 2 TiB when the full size of the entire upload is not known a head of time such as in this file-like object interface.
a30e2ec
to
df76154
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I just had some suggestions on how we can simplify the logic and make sure we keep a good handle on the memory footprint of the implementaiton.
CHANGELOG.md
Outdated
@@ -10,6 +10,7 @@ Unreleased | |||
- The block size is now used for partitioned uploads. Previously, 1 GiB was used for each uploaded block irrespective of the block size | |||
- Updated default block size to be 50 MiB. Set `blocksize` for `AzureBlobFileSystem` or `block_size` when opening `AzureBlobFile` to revert back to 5 MiB default. | |||
- `AzureBlobFile` now inherits the block size from `AzureBlobFileSystem` when fs.open() is called and a block_size is not passed in. | |||
- Added concurrency for `_async_upload_chunk`. Can be set using `max_concurrency` for `AzureBlobFileSystem`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the changelog entry, let's avoid mentioning private methods and instead make it worded in the perspective of someone who is consuming adlfs
. For example:
Introduce concurrent uploads for large `AzureBlobFileSystem.write()` calls. Maximum concurrency can be set using `max_concurrency` for `AzureBlobFileSystem`.
adlfs/spec.py
Outdated
@@ -2156,6 +2156,15 @@ def _get_chunks(self, data): | |||
yield data[start:end] | |||
start = end | |||
|
|||
async def _upload(self, chunk, block_id, semaphore): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this method to _stage_block
to be more self-descriptive of what the underlying logic is here.
adlfs/spec.py
Outdated
) | ||
self._block_list.append(block_id) | ||
block_id = self._get_block_id(self._block_list) | ||
max_concurrency = self.fs.max_concurrency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make sure to check whether max_concurrency
is None
here. It looks like it can end up being resolved to None
based on the logic in the __init__
. We should be able to handle it doing something like this to default to 1
when it is not set.
adlfs/spec.py
Outdated
tasks = [] | ||
block_ids = self._block_list or [] | ||
start_idx = len(block_ids) | ||
chunks = list(self._get_chunks(data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still hesitant in exhausting the entire _get_chunks()
iterator here as we'd be pulling in the entire buffer again but as chunks in this list instead of lazily iterating through it. Could we instead:
- Make sure we are using
memoryview
s on thedata
so that the chunks are not necessarily copied on splicing. - Refactor
_get_chunks
to yield start and end boundaries instead of byte chunks similar to hows3fs
does it. And we'd then update our internal_stage_block
method to splice from thedata
buffer once passed its semaphore.
I like this approach because the memoryview will help reduce any unnecessary copies as part of adlfs code and the semaphore will protect us from loading more chunks than there are tasks available to be running concurrently.
adlfs/spec.py
Outdated
for _ in range(len(chunks)): | ||
block_ids.append(block_id) | ||
block_id = self._get_block_id(block_ids) | ||
|
||
if chunks: | ||
self._block_list = block_ids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding the block_id
upfront before the coroutines run, could we instead just return the block_id
from _stage_block()
and just extend the block_list
from the return result from asyncio.gather()
? I noticed this pattern in s3fs
when organizing Etags and think it can help here too.
I mainly suggest this because:
- It simplifies the logic to the point that I think we can actually just follow reuse the existing for loop prior to this change.
- I adds the property that any block ids that are in the
block_list
have been successfully uploaded. It's hard to tell if a downstream adlfs user will run into this but if they are catching exceptions as part ofwrite()
and still commit the block list, the commit could fail because they will be non existent blocks in that list.
adlfs/spec.py
Outdated
for chunk, block_id in zip(chunks, block_ids[start_idx:]): | ||
tasks.append(self._upload(chunk, block_id, semaphore)) | ||
|
||
await asyncio.gather(*tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that would be interesting to check (maybe tricky to reproduce) is what does the error look like if one of the stage blocks fail, resulting in cancellations of in-flight stage blocks. Supposedly a CancellationError
will be raised in in-progress coroutines so I'm curious how/if it gets propagated correctly (e.g. we do not want cancellation errors to show up to mask the initial underlying error).
(1, 51 * 2**20), | ||
(4, 200 * 2**20), | ||
(4, 49 * 2**20), | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are some good initial cases. I'm not sure how long these tests currently take but it would be interesting to add a case that sets the blocksize to 5MiB and a total blob size of 200 MiB and then have cases that:
- Use the default
max_concurrency
(set it toNone
) to make sure the default settings work out of the box - Use a concurrency of
4
to test scenario where concurrency gets saturated and has to wait
I also like setting the block size lower for some of these cases to better stress the system to help catch any concurrency issues that could arise in the future.
adlfs/tests/test_spec.py
Outdated
(4, 49 * 2**20), | ||
], | ||
) | ||
def test_max_concurrency(storage, max_concurrency, blob_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename it to test_write_max_concurrency
to better indicate we are just testing concurrency for writes.
adlfs/tests/test_spec.py
Outdated
max_concurrency=max_concurrency, | ||
) | ||
data = os.urandom(blob_size) | ||
fs.mkdir("large-file-container") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably be worth using a randomized name for the container so that we are uploading a new blob each time and guarantee there is not state left over from a previous parameterized case. We should also make sure to delete the container at the end of the test to avoid having leftover state.
Another thing we should consider eventually doing (but might require a decent amount of scaffolding) is introducing a pytest fixture that automatically handles the creation and cleanup of containers. It seems like there is a fair amount of cases that could leverage this and probably would make sense to do as a follow up PR.
Utilized the max_concurrency argument to allow concurrent writes in the _async_upload_chunk function.