diff --git a/java/source/dataset.rst b/java/source/dataset.rst index f7ee556a..2ac3fa77 100644 --- a/java/source/dataset.rst +++ b/java/source/dataset.rst @@ -533,4 +533,4 @@ Let's read a CSV file. Salesforce Slack 27.7 01/12/2020 Total batch size: 3 -.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html \ No newline at end of file +.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml index 4f844ea8..38b1fa50 100644 --- a/java/source/demo/pom.xml +++ b/java/source/demo/pom.xml @@ -107,5 +107,10 @@ core 0.11.0 + + ch.qos.logback + logback-classic + 1.2.11 + diff --git a/java/source/io.rst b/java/source/io.rst index 74f74d15..d9253107 100644 --- a/java/source/io.rst +++ b/java/source/io.rst @@ -263,6 +263,93 @@ Write - Out to Buffer Number of rows written: 3 +Write Parquet Files +******************* + +Let's read an Arrow file and populate that data into a Parquet file. + +.. testcode:: + + import java.io.IOException; + import java.nio.file.DirectoryStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + + import org.apache.arrow.dataset.file.DatasetFileWriter; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.ipc.ArrowFileReader; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.ipc.SeekableReadChannel; + import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + + // read arrow demo data: Three row groups each consisting of three rows + Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow"); + try ( + BufferAllocator allocator = new RootAllocator(); + ArrowFileReader readerForDemoData = new ArrowFileReader( + new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel( + Files.readAllBytes(uriRead))), allocator) + ) { + Path uriWrite = Files.createTempDirectory("parquet_"); + // write data for new parquet file + DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString()); + // validate data of parquet file just created + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader readerForFileCreated = scanner.scanBatches() + ) { + while (readerForFileCreated.loadNextBatch()) { + System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString()); + System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount()); + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + // delete temporary parquet file created + try (DirectoryStream dir = Files.newDirectoryStream(uriWrite)) { + uriWrite.toFile().deleteOnExit(); + for (Path path : dir) { + path.toFile().deleteOnExit(); + } + } + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + +.. testoutput:: + + name age + David 10 + Gladis 20 + Juan 30 + RowCount: 3 + name age + Nidia 15 + Alexa 20 + Mara 15 + RowCount: 3 + name age + Raul 34 + Jhon 29 + Thomy 33 + RowCount: 3 + Reading ======= @@ -461,8 +548,8 @@ Reading Parquet File Please check :doc:`Dataset <./dataset>` -Handling Data with Dictionaries -******************************* +Reading Data with Dictionaries +****************************** Reading and writing dictionary-encoded data requires separately tracking the dictionaries. @@ -579,3 +666,20 @@ Reading and writing dictionary-encoded data requires separately tracking the dic Dictionary-encoded data recovered: [0, 3, 4, 5, 7] Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia] Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda] + +Reading Custom Dataset +********************** + +If you need to implement a custom dataset reader, consider extending `ArrowReader`_ class. + +The ArrowReader class can be extended as follows: + +1. Write the logic to read schema on ``readSchema()``. +2. If you do not want to define a logic for reading the schema, then you will also need to override ``getVectorSchemaRoot()``. +3. Once (1) or (2) have been completed, you can proceed to ``loadNextBatch()``. +4. At the end don’t forget to define the logic to ``closeReadSource()``. +5. Make sure you define the logic for closing the ``closeReadSource()`` at the end. + +You could see and example of custom JDBC Reader at :doc:`Write ResultSet to Parquet File <./jdbc>` + +.. _`ArrowReader`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/vector/ipc/ArrowReader.html diff --git a/java/source/jdbc.rst b/java/source/jdbc.rst index 78f78f27..81ffeef8 100644 --- a/java/source/jdbc.rst +++ b/java/source/jdbc.rst @@ -307,3 +307,191 @@ values to the given scale. 102 true 100000000030.0000000 some char text [1,2] INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 103 true 10000000003.0000000 some char text [1] + +Write ResultSet to Parquet File +=============================== + +As an example, we are trying to write a parquet file from the JDBC adapter results. + +.. testcode:: + + import java.io.BufferedReader; + import java.io.FileReader; + import java.io.IOException; + import java.nio.file.DirectoryStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.sql.Connection; + import java.sql.DriverManager; + import java.sql.ResultSet; + import java.sql.SQLException; + import java.sql.Types; + import java.util.HashMap; + + import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; + import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; + import org.apache.arrow.adapter.jdbc.JdbcToArrow; + import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; + import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; + import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; + import org.apache.arrow.dataset.file.DatasetFileWriter; + import org.apache.arrow.dataset.file.FileFormat; + import org.apache.arrow.dataset.file.FileSystemDatasetFactory; + import org.apache.arrow.dataset.jni.NativeMemoryPool; + import org.apache.arrow.dataset.scanner.ScanOptions; + import org.apache.arrow.dataset.scanner.Scanner; + import org.apache.arrow.dataset.source.Dataset; + import org.apache.arrow.dataset.source.DatasetFactory; + import org.apache.arrow.memory.BufferAllocator; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.VectorSchemaRoot; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.types.pojo.Schema; + import org.apache.ibatis.jdbc.ScriptRunner; + import org.slf4j.LoggerFactory; + + import ch.qos.logback.classic.Level; + import ch.qos.logback.classic.Logger; + + class JDBCReader extends ArrowReader { + private final ArrowVectorIterator iter; + private final JdbcToArrowConfig config; + private VectorSchemaRoot root; + private boolean firstRoot = true; + + public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) { + super(allocator); + this.iter = iter; + this.config = config; + } + + @Override + public boolean loadNextBatch() throws IOException { + if (firstRoot) { + firstRoot = false; + return true; + } + else { + if (iter.hasNext()) { + if (root != null && !config.isReuseVectorSchemaRoot()) { + root.close(); + } + else { + root.allocateNew(); + } + root = iter.next(); + return root.getRowCount() != 0; + } + else { + return false; + } + } + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() throws IOException { + if (root != null && !config.isReuseVectorSchemaRoot()) { + root.close(); + } + } + + @Override + protected Schema readSchema() throws IOException { + return null; + } + + @Override + public VectorSchemaRoot getVectorSchemaRoot() throws IOException { + if (root == null) { + root = iter.next(); + } + return root; + } + } + + ((Logger) LoggerFactory.getLogger("org.apache.arrow")).setLevel(Level.TRACE); + try ( + final BufferAllocator allocator = new RootAllocator(); + final BufferAllocator allocatorJDBC = allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE); + final BufferAllocator allocatorReader = allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE); + final BufferAllocator allocatorParquetWrite = allocator.newChildAllocator("allocatorParquetWrite", 0, + Long.MAX_VALUE); + final Connection connection = DriverManager.getConnection( + "jdbc:h2:mem:h2-jdbc-adapter") + ) { + ScriptRunner runnerDDLDML = new ScriptRunner(connection); + runnerDDLDML.setLogWriter(null); + runnerDDLDML.runScript(new BufferedReader( + new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql"))); + runnerDDLDML.runScript(new BufferedReader( + new FileReader("./thirdpartydeps/jdbc/h2-dml.sql"))); + JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocatorJDBC, + JdbcToArrowUtils.getUtcCalendar()) + .setTargetBatchSize(2) + .setReuseVectorSchemaRoot(true) + .setArraySubTypeByColumnNameMap( + new HashMap<>() {{ + put("LIST_FIELD19", + new JdbcFieldInfo(Types.INTEGER)); + }} + ) + .build(); + String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1"; + try ( + final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query); + final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator( + resultSetConvertToParquet, config) + ) { + Path uri = Files.createTempDirectory("parquet_"); + try ( + // get jdbc row data as a arrow reader + final JDBCReader arrowReader = new JDBCReader(allocatorReader, arrowVectorIterator, config) + ) { + // write arrow reader to parqueet file + DatasetFileWriter.write(allocatorParquetWrite, arrowReader, FileFormat.PARQUET, uri.toUri().toString()); + } + // validate data of parquet file created + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri.toUri().toString()); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + while (reader.loadNextBatch()) { + System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); + System.out.println("RowCount: " + reader.getVectorSchemaRoot().getRowCount()); + } + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + // delete temporary parquet file created + try (DirectoryStream dir = Files.newDirectoryStream(uri)) { + uri.toFile().deleteOnExit(); + for (Path path : dir) { + path.toFile().deleteOnExit(); + } + } + } + runnerDDLDML.closeConnection(); + } catch (SQLException | IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + +.. testoutput:: + + INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 + 101 true 1000000000300 some char text [1,2,3] + 102 true 100000000030 some char text [1,2] + RowCount: 2 + INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 + 103 true 10000000003 some char text [1] + RowCount: 1 diff --git a/java/source/substrait.rst b/java/source/substrait.rst index ee87371f..750955ca 100644 --- a/java/source/substrait.rst +++ b/java/source/substrait.rst @@ -63,7 +63,7 @@ Here is an example of a Java program that queries a Parquet file: import java.util.HashMap; import java.util.Map; - static Plan queryTableNation() throws SqlParseException { + Plan queryTableNation() throws SqlParseException { String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17"; String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " + "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; @@ -72,7 +72,7 @@ Here is an example of a Java program that queries a Parquet file: return plan; } - static void queryDatasetThruSubstraitPlanDefinition() { + void queryDatasetThruSubstraitPlanDefinition() { String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; ScanOptions options = new ScanOptions(/*batchSize*/ 32768); try ( @@ -135,7 +135,7 @@ For example, we can join the nation and customer tables from the TPC-H benchmark import java.util.HashMap; import java.util.Map; - static Plan queryTableNationJoinCustomer() throws SqlParseException { + Plan queryTableNationJoinCustomer() throws SqlParseException { String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " + "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " + "GROUP BY n.n_name"; @@ -151,7 +151,7 @@ For example, we can join the nation and customer tables from the TPC-H benchmark return plan; } - static void queryTwoDatasetsThruSubstraitPlanDefinition() { + void queryTwoDatasetsThruSubstraitPlanDefinition() { String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet"; ScanOptions options = new ScanOptions(/*batchSize*/ 32768);