-
Notifications
You must be signed in to change notification settings - Fork 73
GH-794: Avro file read and write support #802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
GH-794: Avro file read and write support #802
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the long delay. Overall this looks reasonable to me.
BinaryData.compareBytes(AVRO_MAGIC, 0, AVRO_MAGIC.length, magic, 0, AVRO_MAGIC.length); | ||
|
||
if (validateMagic != 0) { | ||
throw new RuntimeException("Invalid AVRO data file: The file is not an Avro file"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a more specific exception we could throw? Or at least, an IOException?
} | ||
|
||
private String processCodec(ByteBuffer buffer) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit but why do many methods start with an extra blank line 😅
|
||
// Schema and VSR available after readHeader() | ||
Schema getSchema() { | ||
if (avroSchema == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to null-check the field itself (arrowSchema, not avroSchema)?
Should we use a Preconditions or Objects method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, given that this check repeats a lot - have a checkStarted
method?
syncMarker, 0, SYNC_MARKER_SIZE, batchSyncMarker, 0, SYNC_MARKER_SIZE); | ||
|
||
if (validateMarker != 0) { | ||
throw new RuntimeException("Invalid AVRO data file: The file is corrupted"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, can we throw a more specific exception? Maybe create a subclass of IOException to indicate invalid files?
What's Changed
Add top level reader / writer classes for Avro files. This is a draft for discussion, I haven't written tests, doc comments etc. so will do all that once we agree the shape of the implementation is correct.
I have built a very simple implementation which just uses Avro's public components with the existing producers / consumers, following the Avro container file spec. Internally Avro uses input / output streams and heap allocated byte arrays, so I have based our reader / writer on those elements. I have recycled buffers / streams wherever it is possible, without breaking into Avro's internal structures.
For compression I am using the Avro own codec implementations. Instantiation via CodecFactory is restricted to Avro's own file handling package, but the codecs themselves are mostly public, with the exception of Snappy for some unknown reason. I'd be happy to raise a ticket and ask about that, or we could just copy the Snappy implementation in our own namespace (it is a simple wrapper on Xerial).
I did look at an alternative approach, using ArrowBuf for the batch buffers with Arrow's codec implementations, which we could add to. However there were a couple of issues:
Since Avro uses streams / byte arrays internally, pretty much every way of getting to ArrowBuf involved going to a byte array first and then copying. To break out of that we'd need to reimplement large parts of Avro's file package, including encoders / decoders and shade some key classes in the Avro namespace.
Arrow's codec API assumes that compressed data is always written with the uncompressed size stored at the start of the output, which makes them unusable for other formats that don't do that, including Avro. We'd need to add a new API and implement the codecs again to handle resizing the output buffer.
Given these considerations, I eventually came to think the very simple approach I've drafted might be the best option. If there are performance benefits to be had by switching to ArrowBuf, Channels etc. we'd need to write a lot more code, which needs to stay in sync and be maintained etc. We can still add overload constructors to wrap channels for IO.
Final point on non-blocking mode for the reader. The approach I have used is just to insist that when blocking = false the input stream must support mark / reset, and then peak at the beginning of each batch to determine its size. Also, non-blocking readers are direct, which disable's Avro's internal buffering. I'm assuming that anyone using non-blocking will implement their own stream and buffering logic, in which case they can add mark / reset and won't want Avro randomly reading extra bytes for an internal buffer! At least that is what I'm planning to do. I haven't tried to estimate header size - there isn't really a way to do that without reading the whole header. We could have something like headerBytesNeeded() to incrementally return the number of bytes still needed. The alternative is just to provide a couple of MB and assume it's enough, which probably works, but on reflection I do think we should probably add this, just so the API for non-blocking is "complete".
Hope this all makes sense - please let me know your thoughts when you get a chance and then I'll do the last bit of work to get this ready.
Closes #794.