Skip to content

Remote source (S3 Range GET) support with row‑group pruning#31

Open
shayonj wants to merge 1 commit intonjaremko:mainfrom
shayonj:s/reader-remote-source
Open

Remote source (S3 Range GET) support with row‑group pruning#31
shayonj wants to merge 1 commit intonjaremko:mainfrom
shayonj:s/reader-remote-source

Conversation

@shayonj
Copy link

@shayonj shayonj commented Oct 2, 2025

This adds first‑class support for reading Parquet from remote/random‑access sources (for example, AWS S3 or any HTTP Range endpoint) without downloading entire files, and it introduces explicit row‑group selection and pruning to minimize bytes fetched. The Ruby API remains familiar; internally we add a RemoteSource path and extend the core reader to accept row‑group filters alongside column projection so callers can target only the data they need.

Understanding of the problem

  • The reader effectively assumes local files or in‑memory bytes, which makes S3 and other object stores inefficient for Range‑based lookups because you either fall back to whole‑file downloads or ad‑hoc buffering, and you cannot consistently benefit from row‑group pruning.
  • The API does not offer a simple, direct way to prune row groups at read time even when the footer statistics can cheaply identify which groups contain the target rows.

Solution

  • Remote source contract (Ruby): Any object that implements byte_length -> Integer and read_range(offset, length) -> binary String can now be passed anywhere a path or IO was accepted, which enables true random‑access reads and unlocks pruning only the row groups you need.
  • Adapter integration (Rust): RemoteSource validates the Ruby object; ThreadSafeRemoteSource synchronizes concurrent calls; RemoteRangeReader implements Read + Seek over repeated read_range calls with exact‑length guarantees; and CloneableChunkReader gains a Remote variant with from_remote factory.
  • Row‑group selection: row_groups: [Integer, ...] is now supported on Parquet.each_row and Parquet.each_column, and the core reader provides read_rows_with_selection / read_columns_with_selection to combine row‑group filters with column projection.
  • Metadata statistics exposure: Row‑group level stats (e.g., min_bytes, max_bytes, null_count) are returned to Ruby so callers can prune candidates before any large reads.
  • Ruby ergonomics: lib/parquet.rb coerces columns to String[] and row_groups to Integer[] while preserving enumerator behavior; all new features are opt‑in and backwards compatible.

Example: AWS S3 remote source

class S3RangeSource
  def initialize(bucket:, key:, s3: Aws::S3::Client.new, size: nil)
    @bucket, @key, @s3, @size = bucket, key, s3, size
  end

  def byte_length
    @size ||= @s3.head_object(bucket: @bucket, key: @key).content_length
  end

  def read_range(offset, length)
    return "".b if length == 0
    resp = @s3.get_object(bucket: @bucket, key: @key, range: "bytes=#{offset}-#{offset + length - 1}")
    data = +""
    resp.body.read(nil, out: data)
    data
  end
end

Case A: you already know the row group ordinal (from a catalog)

source = S3RangeSource.new(bucket: "my-bucket", key: "path/to/file.parquet")
rows = Parquet.each_row(
  source,
  columns: %w[id output],
  row_groups: [rg_ordinal],
  strict: false
).to_a

Case B: you don’t have a catalog; pick candidates via footer stats (id example)

source   = S3RangeSource.new(bucket: "my-bucket", key: "path/to/file.parquet")
metadata = Parquet.metadata(source)

fields   = metadata.dig("schema", "fields") || []
id_idx   = fields.each_with_index.find { |f, _| f["name"] == "id" }&.last
rgs      = metadata["row_groups"] || []

target_id = 123
candidates =
  if id_idx
    rgs.filter_map do |rg|
      s = (rg["statistics"] || {})[id_idx]
      next unless s && s["min_bytes"] && s["max_bytes"]
      min_v = s["min_bytes"].byteslice(0, 8).unpack1("q<") rescue nil
      max_v = s["max_bytes"].byteslice(0, 8).unpack1("q<") rescue nil
      (min_v && max_v && (min_v..max_v).cover?(target_id)) ? rg["ordinal"] : nil
    end
  else
    []
  end
candidates = rgs.map { |rg| rg["ordinal"] } if candidates.empty?

rows = Parquet.each_row(
  source,
  columns: %w[id output],
  row_groups: candidates,
  strict: false
).to_a

Notes:

  • This only fetches the footer plus the selected row group(s), not the whole file.
  • If you have a catalog of row-group ordinals (or offsets), you can use Case A for fewer footer reads and simpler logic.

Performance characteristics

  • Range‑based reads mean the reader fetches only the footer and the selected row‑group spans rather than the whole file.
  • Combining column projection with row‑group pruning materially reduces bytes transferred and tail latency for point lookups and filter‑heavy scans.

Tests

  • test/enumerator_test.rb: adds test_remote_source_integration and a row_groups smoke check.
  • test/column_test.rb: adds a row_groups smoke check for column batches.
  • Existing enumerator and schema tests also exercise the updated paths.

Backwards compatibility

  • Existing file and IO sources continue to work exactly as before.
  • Remote reads and row‑group selection are opt‑in and require no changes for current users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant