Skip to content

GH-120: optimise created parquet files#138

Merged
s-prosvirnin merged 8 commits into
mainfrom
GH-120/optimise-created-parquet-files
May 6, 2026
Merged

GH-120: optimise created parquet files#138
s-prosvirnin merged 8 commits into
mainfrom
GH-120/optimise-created-parquet-files

Conversation

@s-prosvirnin
Copy link
Copy Markdown
Member

@s-prosvirnin s-prosvirnin commented May 4, 2026

Summary by CodeRabbit

  • New Features

    • New sort-aware shift planner with clustering, bin-packing and partition-aware task scheduling.
    • Field-extraction API and a WAL row-group metadata key for flexible per-row-group metadata.
    • Planner partition specs enabling per-job partitioning control.
  • Improvements

    • Grafana dashboard upgraded to v17 with refined ingest and WAL panels.
    • New metric for oversized planner clusters.
  • Configuration Changes

    • Read config: replaced single max-input-bytes with lower/upper bounds; removed max file size.
  • Documentation

    • Agent guidance updated with two important instructions.

…files' into GH-120/optimise-created-parquet-files

# Conflicts:
#	crates/icegate-ingest/src/shift/plan_runner.rs
#	crates/icegate-ingest/src/shift/planner.rs
#	crates/icegate-ingest/src/shift/planner_partitioning.rs
# Conflicts:
#	crates/icegate-ingest/src/cli/commands/run.rs
#	crates/icegate-ingest/src/shift/iceberg_storage.rs
#	crates/icegate-ingest/src/shift/mod.rs
@s-prosvirnin s-prosvirnin requested review from a team and frisbeeman May 4, 2026 10:03
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 4, 2026

Walkthrough

Replaces grouped segment-based planning with a row-group–level planner and ExtractField extraction API; adds planner partitioning, clustering/bin-packing, byte-based Iceberg rollover sizing, WAL boundary metadata/name tracking, oversized-cluster metrics, and wide test and wiring updates across queue, shift, WAL, storage, helm, and dashboard configs.

Changes

Shift planning, queue extraction, WAL metadata, and wiring

Layer / File(s) Summary
Data Shape / API
crates/icegate-queue/src/extract.rs, crates/icegate-queue/src/lib.rs, crates/icegate-queue/src/reader.rs, crates/icegate-queue/src/writer.rs
Introduce ExtractField/FieldExtractor and ExtractedValue; QueueReader::plan_segments now accepts &[ExtractField] and returns a flat SegmentsPlan.entries: Vec<RowGroupPlanEntry> with per-row-group extracted map. Public WAL_ROW_GROUP_METADATA_KEY constant added.
Planner / Partitioning
crates/icegate-ingest/src/shift/planner.rs, crates/icegate-ingest/src/shift/planner_partitioning.rs
New planner converts RowGroupPlanEntry into PlanRowGroups, derives partition buckets (identity UTF‑8, day timestamp), clusters by overlapping sort-key ranges, bin-packs clusters into chunks with lower/upper byte bounds and oversized-split logic, supports tail-merge; PlannerPartitionSpec maps schema into planner ExtractFields.
Shift config & sizing
crates/icegate-ingest/src/shift/config.rs, config/helm/icegate/values.yaml, config/helm/icegate/templates/configmap-ingest.yaml
Replace max_input_bytes_per_task with lower_bound_input_bytes_per_task and upper_bound_input_bytes_per_task and validate upper ≥ lower; remove max_file_size_mb from write config; defaults and Helm templates updated.
Iceberg storage rollover
crates/icegate-ingest/src/shift/iceberg_storage.rs
Switch from MB-based max_file_size_mb to max_file_size_bytes: u64; add writer_max_parquet_bytes(upper_bound) = upper_bound * 2 (saturating); thread byte budget through Parquet writer and add unit tests.
Runner, executor, and payload shape
crates/icegate-ingest/src/shift/plan_runner.rs, crates/icegate-ingest/src/shift/executor.rs, crates/icegate-ingest/src/shift/mod.rs, crates/icegate-ingest/src/cli/commands/run.rs
PlanTaskRunnerImpl now accepts planner_partition_spec and derives extract_fields; schedule_shift_tasks returns task IDs + PlanSummary; ShiftInput no longer embeds tenant_id — segments carry row_groups; ShiftJobSpec gains planner_partition_spec and is wired in CLI.
Instrumentation & metrics
crates/icegate-ingest/src/infra/metrics.rs, crates/icegate-ingest/src/shift/instrumentation.rs
Add planner_oversized_clusters_total counter and record_planner_oversized_clusters(...); PlanTaskRunnerWithMetrics logs plan summaries and records per-task planned bytes and oversized-cluster counts. QueueReaderWithMetrics::plan_segments signature updated to accept &[ExtractField].
WAL types, serialization, test utilities
crates/icegate-ingest/src/wal/boundary.rs, crates/icegate-ingest/src/wal/metadata.rs, crates/icegate-ingest/src/wal/mod.rs, crates/icegate-ingest/src/wal/columns.rs, crates/icegate-ingest/src/wal/test_utils.rs, crates/icegate-ingest/src/wal/sorter.rs
RowGroupBoundaryRange gains names: Arc<[String]> and stricter validation; RowGroupBoundaryKey hides components with new(...)/components(); serialization helpers renamed to *_boundary_range. SortColumnCache tracks column_names. New test helpers added; sorter writes boundary-range metadata with names.
Merger, shift-runner, and tests
crates/icegate-ingest/src/shift/row_groups_merger.rs, crates/icegate-ingest/src/shift/shift_runner.rs, crates/icegate-queue/tests/*, crates/icegate-ingest/src/shift/* (tests)
Extensive test updates across merger, shift-runner, and queue integration: mocks updated for new plan_segments signature; E2E tests verify tenant-per-file partitioning, monotonicity, WAL-stable row-id ordering, Parquet boundary stats; new rollover failover test; many unit tests added/rewritten for planner, partitioning, and extraction.
Dashboard & docs & minor common changes
config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json, AGENTS.md, crates/icegate-common/src/catalog/config.rs
Grafana dashboard version bumped and many panel queries/legends retargeted to new Prometheus bucket metrics and WAL bytes metrics. AGENTS.md adds English-only and Iceberg-schema-only bullets. S3 catalog bucket validation adjusted to is_none_or(...) (behavior unchanged).

Sequence Diagram(s)

sequenceDiagram
    participant Q as QueueReader (plan_segments)
    participant E as Extractor (ResolvedField)
    participant P as Planner (plan_row_groups)
    participant S as Scheduler (schedule_shift_tasks)
    participant X as Executor (ShiftExecutor / IcebergStorage)

    Q->>E: resolve extractors per segment
    E-->>Q: per-row-group ExtractedValue map
    Q-->>P: SegmentsPlan.entries (RowGroupPlanEntry...)
    P->>P: parse boundary payloads & derive partition buckets
    P->>P: stable-sort, swept-line clustering, bin-pack → chunks
    P-->>S: PlannedChunks + PlannerStats
    S->>S: group row_groups by wal_offset → SegmentToRead
    S-->>X: submit ShiftInput(segments)
    X->>X: read segments, merge, write Parquet (rollover by bytes)
    X-->>S: write acknowledgements / file artifacts
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

Suggested reviewers

  • frisbeeman
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch GH-120/optimise-created-parquet-files

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/icegate-ingest/src/shift/config.rs (1)

164-188: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't silently drop the legacy max_input_bytes_per_task setting.

Because ShiftReadConfig is still deserialized with defaults, older configs can keep parsing while their max_input_bytes_per_task override is ignored and replaced by the new 64/128 MiB defaults. That turns an upgrade into a silent runtime behavior change for chunk sizing. Please either map the legacy key into the new bounds or reject it explicitly with a clear config error.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/icegate-ingest/src/shift/config.rs` around lines 164 - 188,
ShiftReadConfig currently ignores a legacy max_input_bytes_per_task key when
deserializing, causing silent behavior changes; update deserialization or
Default handling so legacy overrides are honored or rejected: detect the
presence of max_input_bytes_per_task during config load and either (1) map it to
set both lower_bound_input_bytes_per_task and upper_bound_input_bytes_per_task
(or compute appropriate bounds) or (2) return a clear error refusing the legacy
key; implement this mapping/rejection in the config deserialization path that
constructs ShiftReadConfig (or in Default::default() wrapper used during
deserialization) to ensure max_input_bytes_per_task is not silently dropped.
crates/icegate-ingest/src/wal/sorter.rs (1)

47-56: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Replace the hard-coded partition column here.

The planner path is now driven by PlannerPartitionSpec, but WAL sorting still assumes "tenant_id" at Line 50. If the partition spec changes, these two paths will drift and the sorter will start producing row groups that are not homogeneous for the configured partition key. Please thread the shared schema/partition-spec source of truth into this path instead of using a string literal.

As per coding guidelines, **/*.rs: Only use Iceberg schema from crates/icegate-common/src/schema.rs, never hardcode columns in working code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/icegate-ingest/src/wal/sorter.rs` around lines 47 - 56, The sorter
currently hardcodes "tenant_id" when locating the partition column (see usage
around tenant_idx, batch.schema(), and tenant_id_column); replace that literal
with the partition key from the shared PlannerPartitionSpec / Iceberg schema
source of truth (crates/icegate-common/src/schema.rs) by threading the
PlannerPartitionSpec (or resolved partition field name) into the sorter (e.g.,
add it to the function signature or struct) and then call
batch.schema().index_of(partition_field_name) and downcast that column instead
of using "tenant_id"; ensure you use the canonical Iceberg schema helpers from
icegate-common rather than a string literal so the WAL sorter stays consistent
with planner partitioning.
crates/icegate-ingest/src/shift/executor.rs (1)

60-77: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Keep the shift-task payload backward compatible across deploys.

ShiftInput is part of the persisted task contract, and the shifter restores jobs from durable storage. Moving row_groups under SegmentToRead means any queued pre-rollout shift task will deserialize into a missing segments field and fail after deployment. Please accept both payload shapes during rollout, or make draining existing jobs a hard deployment prerequisite.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/icegate-ingest/src/shift/executor.rs` around lines 60 - 77, ShiftInput
is part of a persisted task contract and must accept both the new shape
(segments: Vec<SegmentToRead>) and the old shape (row_groups:
Vec<PlannedRowGroup> at the top level) to avoid breaking restores; implement a
backward-compatible deserializer for ShiftInput (e.g., a custom Deserialize impl
or a serde untagged enum) that: if "segments" exists, uses it; otherwise, if
top-level "row_groups" exists, wraps that Vec<PlannedRowGroup> into a single
SegmentToRead (filling any missing fields like segment_offset with a safe
default) and preserves trace_context, then populate ShiftInput.segments
accordingly so both payload shapes deserialize successfully.
🧹 Nitpick comments (1)
crates/icegate-ingest/src/shift/plan_runner.rs (1)

154-188: ⚡ Quick win

Stabilize ShiftInput segment ordering before serialization.

by_segment.into_iter() makes identical plans produce different ShiftInput payloads across runs. Sorting by segment_offset keeps task payloads and logs deterministic without changing behavior.

♻️ Suggested change
-        let segments: Vec<SegmentToRead> = by_segment
-            .into_iter()
-            .map(|(segment_offset, row_groups)| SegmentToRead {
-                segment_offset,
-                row_groups,
-            })
-            .collect();
+        let mut segments: Vec<SegmentToRead> = by_segment
+            .into_iter()
+            .map(|(segment_offset, mut row_groups)| {
+                row_groups.sort_by_key(|row_group| row_group.row_group_idx);
+                SegmentToRead {
+                    segment_offset,
+                    row_groups,
+                }
+            })
+            .collect();
+        segments.sort_by_key(|segment| segment.segment_offset);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/icegate-ingest/src/shift/plan_runner.rs` around lines 154 - 188, The
code builds a HashMap by_segment and then collects segments via
by_segment.into_iter(), which yields non-deterministic ordering; in
schedule_shift_task_for_chunk you should stabilize the order by sorting the
produced Vec<SegmentToRead> by segment_offset (e.g., collect into segments and
then call sort_by_key(|s| s.segment_offset)) before calling create_shift_task so
ShiftInput serialization and logs are deterministic; target symbols:
schedule_shift_task_for_chunk, by_segment, SegmentToRead, segment_offset,
create_shift_task.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json`:
- Around line 349-371: The panels use queries that group by protocol, signal,
status (and sometimes encoding) but their legendFormat only shows p50/p95/p99 or
incomplete labels, causing indistinguishable series; update each offending query
(e.g., refIds like
icegate_ingest_shift_plan_duration_seconds_bucket-p99-histogram_quantile, -p90-,
-p50- and the other refId blocks at the ranges called out) to either remove the
extra labels from the sum by(...) clause (e.g., drop
protocol/signal/status/encoding so aggregation matches simple legend) or include
those labels in legendFormat (e.g., "{{protocol}} - {{signal}} - {{status}} -
{{encoding}} - p95") so the legend uniquely identifies each series; apply the
same change consistently for the other listed panels (lines ~462-484, ~576-598,
~804-826, ~1032-1055, ~1866-1889).

In `@crates/icegate-ingest/src/shift/shift_runner.rs`:
- Around line 1292-1303: The current computation uses ts_col.value(i).into()
which treats null slots as valid values; change the logic to only consider
non-null values (use ts_col.is_valid(i) / !ts_col.is_null(i) and collect
ts_col.value(i) for those indices), compute min/max from that iterator, and only
call extracted.insert(..., TimestampMicrosRange(min, max)) when you actually
have at least one valid timestamp; otherwise do not insert the
PLAN_FIELD_TIMESTAMP_RANGE entry. Target symbols: ts_col
(TimestampMicrosecondArray), the min_ts/max_ts computation, and the
extracted.insert(...) using PLAN_FIELD_TIMESTAMP_RANGE.

In `@crates/icegate-queue/src/extract.rs`:
- Around line 53-61: Reject duplicate ExtractField names up front by validating
ExtractField::name before populating the resulting RowGroupPlanEntry::extracted
map: iterate the incoming Vec<ExtractField>, track names in a HashSet, and if
insert returns false return an error (or panic if codebase convention uses
unwraps) describing the duplicate name; ensure the validation occurs where the
Vec<ExtractField> is converted into the extracted map so no duplicate keys can
collapse silently.
- Around line 240-246: The code currently treats any empty extracted value as a
metadata error (the value.is_empty() branch that returns
Err(QueueError::Metadata(...))) which incorrectly rejects valid singleton empty
values (min == max == b""). Remove that empty-check branch (or stop returning
QueueError::Metadata for empty strings) so that the function returns
Ok(value.to_string()) for empty strings as well; reference the value.is_empty()
check and the Err(QueueError::Metadata(...)) that mentions column_name,
row_group_idx, and wal_offset to locate and modify the logic in extract.rs.

---

Outside diff comments:
In `@crates/icegate-ingest/src/shift/config.rs`:
- Around line 164-188: ShiftReadConfig currently ignores a legacy
max_input_bytes_per_task key when deserializing, causing silent behavior
changes; update deserialization or Default handling so legacy overrides are
honored or rejected: detect the presence of max_input_bytes_per_task during
config load and either (1) map it to set both lower_bound_input_bytes_per_task
and upper_bound_input_bytes_per_task (or compute appropriate bounds) or (2)
return a clear error refusing the legacy key; implement this mapping/rejection
in the config deserialization path that constructs ShiftReadConfig (or in
Default::default() wrapper used during deserialization) to ensure
max_input_bytes_per_task is not silently dropped.

In `@crates/icegate-ingest/src/shift/executor.rs`:
- Around line 60-77: ShiftInput is part of a persisted task contract and must
accept both the new shape (segments: Vec<SegmentToRead>) and the old shape
(row_groups: Vec<PlannedRowGroup> at the top level) to avoid breaking restores;
implement a backward-compatible deserializer for ShiftInput (e.g., a custom
Deserialize impl or a serde untagged enum) that: if "segments" exists, uses it;
otherwise, if top-level "row_groups" exists, wraps that Vec<PlannedRowGroup>
into a single SegmentToRead (filling any missing fields like segment_offset with
a safe default) and preserves trace_context, then populate ShiftInput.segments
accordingly so both payload shapes deserialize successfully.

In `@crates/icegate-ingest/src/wal/sorter.rs`:
- Around line 47-56: The sorter currently hardcodes "tenant_id" when locating
the partition column (see usage around tenant_idx, batch.schema(), and
tenant_id_column); replace that literal with the partition key from the shared
PlannerPartitionSpec / Iceberg schema source of truth
(crates/icegate-common/src/schema.rs) by threading the PlannerPartitionSpec (or
resolved partition field name) into the sorter (e.g., add it to the function
signature or struct) and then call batch.schema().index_of(partition_field_name)
and downcast that column instead of using "tenant_id"; ensure you use the
canonical Iceberg schema helpers from icegate-common rather than a string
literal so the WAL sorter stays consistent with planner partitioning.

---

Nitpick comments:
In `@crates/icegate-ingest/src/shift/plan_runner.rs`:
- Around line 154-188: The code builds a HashMap by_segment and then collects
segments via by_segment.into_iter(), which yields non-deterministic ordering; in
schedule_shift_task_for_chunk you should stabilize the order by sorting the
produced Vec<SegmentToRead> by segment_offset (e.g., collect into segments and
then call sort_by_key(|s| s.segment_offset)) before calling create_shift_task so
ShiftInput serialization and logs are deterministic; target symbols:
schedule_shift_task_for_chunk, by_segment, SegmentToRead, segment_offset,
create_shift_task.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f4e0e0f8-fbb0-4328-ae32-d94986ac79ff

📥 Commits

Reviewing files that changed from the base of the PR and between b671fdd and 7a5fef9.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (32)
  • AGENTS.md
  • config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json
  • config/docker/grafana/dashboards/Icegate-Query-1773746395045923.json
  • config/helm/icegate/templates/configmap-ingest.yaml
  • config/helm/icegate/values.yaml
  • crates/icegate-common/src/catalog/config.rs
  • crates/icegate-ingest/src/cli/commands/run.rs
  • crates/icegate-ingest/src/infra/metrics.rs
  • crates/icegate-ingest/src/shift/config.rs
  • crates/icegate-ingest/src/shift/executor.rs
  • crates/icegate-ingest/src/shift/iceberg_storage.rs
  • crates/icegate-ingest/src/shift/instrumentation.rs
  • crates/icegate-ingest/src/shift/mod.rs
  • crates/icegate-ingest/src/shift/plan_runner.rs
  • crates/icegate-ingest/src/shift/planner.rs
  • crates/icegate-ingest/src/shift/planner_partitioning.rs
  • crates/icegate-ingest/src/shift/row_groups_merger.rs
  • crates/icegate-ingest/src/shift/shift_runner.rs
  • crates/icegate-ingest/src/wal/boundary.rs
  • crates/icegate-ingest/src/wal/columns.rs
  • crates/icegate-ingest/src/wal/metadata.rs
  • crates/icegate-ingest/src/wal/mod.rs
  • crates/icegate-ingest/src/wal/sorter.rs
  • crates/icegate-ingest/src/wal/test_utils.rs
  • crates/icegate-queue/src/config.rs
  • crates/icegate-queue/src/extract.rs
  • crates/icegate-queue/src/lib.rs
  • crates/icegate-queue/src/reader.rs
  • crates/icegate-queue/src/writer.rs
  • crates/icegate-queue/tests/common/mod.rs
  • crates/icegate-queue/tests/reader_integration.rs
  • crates/icegate-queue/tests/writer_integration.rs
💤 Files with no reviewable changes (1)
  • crates/icegate-queue/src/config.rs

Comment thread config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json
Comment thread crates/icegate-ingest/src/shift/shift_runner.rs Outdated
Comment thread crates/icegate-queue/src/extract.rs
Comment thread crates/icegate-queue/src/extract.rs Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json (1)

1563-1582: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Metric name mismatch will cause this panel to show no data.

The queries reference icegate_ingest_wal_segment_bytes_bucket, but the metric defined in crates/icegate-ingest/src/infra/metrics.rs is named icegate_ingest_wal_segment (which becomes icegate_ingest_wal_segment_bucket in Prometheus):

segment_bytes: meter.f64_histogram("icegate_ingest_wal_segment").build(),
Fix: Remove `_bytes` from the metric names in all three queries
- "expr": "histogram_quantile(0.95, sum by (le, topic) (rate(icegate_ingest_wal_segment_bytes_bucket{job=\"ingest\"}[1m])))\n",
+ "expr": "histogram_quantile(0.95, sum by (le, topic) (rate(icegate_ingest_wal_segment_bucket{job=\"ingest\"}[1m])))\n",
- "expr": "histogram_quantile(0.5, sum by (le, topic) (rate(icegate_ingest_wal_segment_bytes_bucket{job=\"ingest\"}[1m])))",
+ "expr": "histogram_quantile(0.5, sum by (le, topic) (rate(icegate_ingest_wal_segment_bucket{job=\"ingest\"}[1m])))",
- "expr": "histogram_quantile(0.99, sum by (le, topic) (rate(icegate_ingest_wal_segment_bytes_bucket{job=\"ingest\"}[1m])))",
+ "expr": "histogram_quantile(0.99, sum by (le, topic) (rate(icegate_ingest_wal_segment_bucket{job=\"ingest\"}[1m])))",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json` around
lines 1563 - 1582, The panel queries reference the wrong metric name suffix:
update the three Prometheus expressions that use
icegate_ingest_wal_segment_bytes_bucket to use icegate_ingest_wal_segment_bucket
(the histogram created by segment_bytes in metrics.rs), i.e. replace occurrences
of icegate_ingest_wal_segment_bytes_bucket with
icegate_ingest_wal_segment_bucket in the expressions (those blocks with refId
values like
icegate_ingest_shift_plan_duration_seconds_bucket-p99-histogram_quantile and A).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json`:
- Around line 1563-1582: The panel queries reference the wrong metric name
suffix: update the three Prometheus expressions that use
icegate_ingest_wal_segment_bytes_bucket to use icegate_ingest_wal_segment_bucket
(the histogram created by segment_bytes in metrics.rs), i.e. replace occurrences
of icegate_ingest_wal_segment_bytes_bucket with
icegate_ingest_wal_segment_bucket in the expressions (those blocks with refId
values like
icegate_ingest_shift_plan_duration_seconds_bucket-p99-histogram_quantile and A).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2ae27301-7956-4277-8cfd-9579261a21e6

📥 Commits

Reviewing files that changed from the base of the PR and between 7a5fef9 and 92a63e9.

📒 Files selected for processing (4)
  • config/docker/grafana/dashboards/Icegate-Ingest-1769530183590.json
  • crates/icegate-ingest/src/shift/shift_runner.rs
  • crates/icegate-queue/src/extract.rs
  • crates/icegate-queue/src/reader.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/icegate-queue/src/extract.rs

Comment thread config/helm/icegate/values.yaml
Comment thread crates/icegate-ingest/src/shift/plan_runner.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/icegate-ingest/src/shift/config.rs (1)

164-181: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fix Helm/Rust config key and unit mismatch for input bounds.

The Helm template (configmap-ingest.yaml) defines lower_bound_input_bytes_per_task and upper_bound_input_bytes_per_task, but the Rust struct exposes lower_bound_input_mb_per_task and upper_bound_input_mb_per_task. With #[serde(default)] on ShiftReadConfig, mismatched keys will silently use defaults (64 MB and 128 MB) instead of the operator-supplied values, with no deserialization error. Additionally, the Helm keys imply byte units while the Rust struct fields are in megabytes—even if keys were renamed, the values would be off by approximately 1000× without unit conversion. This silently breaks production deployment configurations.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/icegate-ingest/src/shift/config.rs` around lines 164 - 181,
ShiftReadConfig’s fields lower_bound_input_mb_per_task and
upper_bound_input_mb_per_task don’t match the Helm keys
lower_bound_input_bytes_per_task/upper_bound_input_bytes_per_task and so
operator values are ignored; to fix, update the struct to accept the bytes keys
and convert units: either add serde attributes to those fields (e.g.
#[serde(alias = "lower_bound_input_bytes_per_task", deserialize_with = "...")]
and the corresponding alias for upper) and implement a small deserialize_with
function that accepts bytes (u64) and converts to MB (divide by 1_048_576) or
change the field types to bytes (u64) and convert to MB where consumed; ensure
the serde default behavior and any default values remain correct and reference
ShiftReadConfig, lower_bound_input_mb_per_task, upper_bound_input_mb_per_task,
and the Helm keys in configmap-ingest.yaml when making the change.
🧹 Nitpick comments (1)
crates/icegate-ingest/src/shift/mod.rs (1)

113-127: 💤 Low value

Consider a helper on ShiftReadConfig for MB→bytes conversion.

upper_bound_input_mb_per_task * 1024 * 1024 is computed here, in plan_runner.rs (for both lower and upper bounds), and in tests. A single inherent method like ShiftReadConfig::upper_bound_input_bytes(&self) -> u64 (and a sibling for the lower bound) would centralize the unit conversion, make call sites read more clearly at the storage/planner boundary, and give one place to add saturating arithmetic should the per-task budget ever grow toward u64 limits.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/icegate-ingest/src/shift/mod.rs` around lines 113 - 127, Add inherent
byte-conversion helpers on ShiftReadConfig (e.g., upper_bound_input_bytes(&self)
-> u64 and lower_bound_input_bytes(&self) -> u64) and replace direct
multiplications of upper_bound_input_mb_per_task * 1024 * 1024 (and the
lower-bound equivalents) with calls to these methods in shift/mod.rs (where
writer_max_parquet_bytes is called), in plan_runner.rs, and in tests; implement
the helpers using saturating multiplication (saturating_mul or checked_mul with
fallback) to avoid overflow if the MB value grows.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@crates/icegate-ingest/src/shift/config.rs`:
- Around line 164-181: ShiftReadConfig’s fields lower_bound_input_mb_per_task
and upper_bound_input_mb_per_task don’t match the Helm keys
lower_bound_input_bytes_per_task/upper_bound_input_bytes_per_task and so
operator values are ignored; to fix, update the struct to accept the bytes keys
and convert units: either add serde attributes to those fields (e.g.
#[serde(alias = "lower_bound_input_bytes_per_task", deserialize_with = "...")]
and the corresponding alias for upper) and implement a small deserialize_with
function that accepts bytes (u64) and converts to MB (divide by 1_048_576) or
change the field types to bytes (u64) and convert to MB where consumed; ensure
the serde default behavior and any default values remain correct and reference
ShiftReadConfig, lower_bound_input_mb_per_task, upper_bound_input_mb_per_task,
and the Helm keys in configmap-ingest.yaml when making the change.

---

Nitpick comments:
In `@crates/icegate-ingest/src/shift/mod.rs`:
- Around line 113-127: Add inherent byte-conversion helpers on ShiftReadConfig
(e.g., upper_bound_input_bytes(&self) -> u64 and lower_bound_input_bytes(&self)
-> u64) and replace direct multiplications of upper_bound_input_mb_per_task *
1024 * 1024 (and the lower-bound equivalents) with calls to these methods in
shift/mod.rs (where writer_max_parquet_bytes is called), in plan_runner.rs, and
in tests; implement the helpers using saturating multiplication (saturating_mul
or checked_mul with fallback) to avoid overflow if the MB value grows.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b9d907dc-9959-4750-ae32-773b8fb41f24

📥 Commits

Reviewing files that changed from the base of the PR and between 92a63e9 and 810e129.

📒 Files selected for processing (5)
  • crates/icegate-ingest/src/shift/config.rs
  • crates/icegate-ingest/src/shift/iceberg_storage.rs
  • crates/icegate-ingest/src/shift/mod.rs
  • crates/icegate-ingest/src/shift/plan_runner.rs
  • crates/icegate-ingest/src/shift/shift_runner.rs

@s-prosvirnin s-prosvirnin requested a review from frisbeeman May 6, 2026 12:56
@s-prosvirnin s-prosvirnin merged commit 1972421 into main May 6, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants