Skip to content

[Java][Python] Avoid Reuse VectorSchemaRoot for exporting ArrowArrayStream to other language #185

Open
@hu6360567

Description

@hu6360567

Describe the bug, including details regarding any error messages, version, and platform.

I'm trying to import/export data to database in python through ArrayStream over pyarrow.jvm and JDBC.

In order to export ArrowVectorIterator as stream without unloading to RecordBatch on java side before it export to stream, I wrap ArrowVectorIterator into ArrowReader as below:

public class ArrowVectorIteratorReader extends ArrowReader {

    private final Iterator<VectorSchemaRoot> iterator;
    private final Schema schema;
    private VectorSchemaRoot root;

    public ArrowVectorIteratorReader(BufferAllocator allocator, Iterator<VectorSchemaRoot> iterator, Schema schema) {
        super(allocator);
        this.iterator = iterator;
        this.schema = schema;
        this.root = null;
    }

    @Override
    public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
        if (root == null) return super.getVectorSchemaRoot();
        return root;
    }

    @Override
    public boolean loadNextBatch() throws IOException {
        if (iterator.hasNext()) {
            VectorSchemaRoot lastRoot = root;
            root = iterator.next();
            if (root != lastRoot && lastRoot != null) lastRoot.close();
            return true;
        } else {
            return false;
        }
    }

    @Override
    public long bytesRead() {
        return 0;
    }

    @Override
    protected void closeReadSource() throws IOException {
        if (iterator instanceof AutoCloseable) {
            try {
                ((AutoCloseable) iterator).close();
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
        root.close();
    }

    @Override
    protected Schema readSchema() throws IOException {
        return schema;
    }
}

When ArrowVectorIterator use the config with reuseVectorSchemaRoot is enabled, utf8 array may crushed on python side, but works as expectred on java side.

Java code as below

try (final ArrowReader source = porter.importData(1);    returns ArrowVectorIteratorReader with batchSize=1
             final ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
            Data.exportArrayStream(allocator, source, stream);

            try (final ArrowReader reader = Data.importArrayStream(allocator, stream)) {
                while (reader.loadNextBatch()) {
                    // root from getVectorSchemaRoot() is legal on every vector
                    totalRecord += reader.getVectorSchemaRoot().getRowCount();
                }
            }
        }

On Python side, the situation is unexplainable.
The exported stream from Java in wrapped into a RecordBatchReader and write into different file formats.

def wrap_from_java_stream_to_generator(java_arrow_stream, allocator=None, yield_schema=False):
    if allocator is None:
        allocator = get_java_root_allocator().allocator
    c_stream = arrow_c.new("struct ArrowArrayStream*")
    c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))

    org = jpype. JPackage("org")
    java_wrapped_stream = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)

    org.apache.arrow.c.Data.exportArrayStream(allocator, java_arrow_stream, java_wrapped_stream)

    # noinspection PyProtectedMember
    with pa. RecordBatchReader._import_from_c(c_stream_ptr) as reader:  # type: pa. RecordBatchReader
        if yield_schema:
            yield reader.schema
        yield from reader


def wrap_from_java_stream(java_arrow_stream, allocator=None):
    generator = wrap_from_java_stream_to_generator(java_arrow_stream, allocator, yield_schema=True)
    schema = next(generator)

    return pa. RecordBatchReader.from_batches(schema, generator)

For CSV, works as expected

with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
    with pa.csv.CSVWriter(csv_path, stream.schema) as writer:
        for record_batch in stream:
            writer.write_batch(record_batch)

For Parquet, writing with dataset api as below

with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
    pa.dataset.write_dataset(stream, data_path, format="parquet")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
......./python3.8/site-packages/pyarrow/dataset.py:999: in write_dataset
    _filesystemdataset_write(
pyarrow/_dataset.pyx:3655: in pyarrow._dataset._filesystemdataset_write
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   pyarrow.lib.ArrowInvalid: Parquet cannot store strings with size 2GB or more

OR

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   pyarrow.lib.ArrowInvalid: Column 1: In chunk 0: Invalid: Length spanned by binary offsets (7) larger than values array (size 6)

In order to making out which record raises error, RecordBatchReader is wrapped into a smaller batch size and log the content as below:

with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
    def generator():
        for rb in stream:
            for i in range(rb.num_rows):
                slice = rb.slice(i,1)
                logger.info(slice.to_pylist())
                yield slice
    pa.dataset.write_dataset(pa.RecordBatchReader.from_batches(stream.schema, generator(), data_path, format="parquet")

Although the logger can print the slice, but write_dataset fails

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
......./python3.8/site-packages/pyarrow/dataset.py:999: in write_dataset
    _filesystemdataset_write(
pyarrow/_dataset.pyx:3655: in pyarrow._dataset._filesystemdataset_write
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   pyarrow.lib.ArrowInvalid: Column 1: In chunk 0: Invalid: First or last binary offset out of bounds

For arrow/feather format, it seems directly write record_batch into files, but when record_batch is invalid when reading from file (code is similar as above)

Then, if I create the ArrowVectorIteratorReader without reuseVectorSchemaRoot, everything works fine on Python side.

Component(s)

Java, Python

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions