feat: add InfluxDB-compatible /write and /api/v2/write endpoints#65
Merged
Conversation
Agent-Logs-Url: https://github.com/IoTSharp/SonnetDB/sessions/e2e54668-dc4f-4731-8e6f-f0365dc532a5 Co-authored-by: maikebing <3445167+maikebing@users.noreply.github.com>
Copilot created this pull request from a session on behalf of
maikebing
May 12, 2026 16:20
View session
Agent-Logs-Url: https://github.com/IoTSharp/SonnetDB/sessions/6d65a404-0cee-43dc-9b5c-69b2d5ba72a3 Co-authored-by: maikebing <3445167+maikebing@users.noreply.github.com>
Agent-Logs-Url: https://github.com/IoTSharp/SonnetDB/sessions/a35be1dd-1c19-401e-9b3f-dab5662a1354 Co-authored-by: maikebing <3445167+maikebing@users.noreply.github.com>
Agent-Logs-Url: https://github.com/IoTSharp/SonnetDB/sessions/a35be1dd-1c19-401e-9b3f-dab5662a1354 Co-authored-by: maikebing <3445167+maikebing@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR expands SonnetDB’s ingest compatibility and improves query-time block pruning. It adds InfluxDB-compatible write endpoints and a Prometheus Remote Write endpoint at the server layer, plus a block-level skipping index in SegmentIndex to reduce per-query scanning.
Changes:
- Add InfluxDB-compatible
POST /write(v1) andPOST /api/v2/write(v2) endpoints, including gzip body support, precision handling, andAuthorization: Tokenalias support. - Add Prometheus Remote Write compatible
POST /api/v1/prom/write?db=...endpoint with Snappy block-format decompression and a minimal protobuf decoder. - Optimize
SegmentIndextime-range pruning with a prefix-max skipping index and updateMultiSegmentIndexto use the new per-series time-range overload; add targeted tests (including fuzz) for correctness.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/SonnetDB.Tests/SonnetDB.Tests.csproj | Adds Snappier for endpoint E2E tests that build snappy payloads. |
| tests/SonnetDB.Tests/PrometheusRemoteWriteEndpointTests.cs | End-to-end tests for Prometheus Remote Write ingest + auth/error cases. |
| tests/SonnetDB.Tests/InfluxLineProtocolEndpointTests.cs | End-to-end tests for Influx /write and /api/v2/write behavior. |
| tests/SonnetDB.Core.Tests/Storage/Segments/SegmentIndexSkippingTests.cs | New tests validating prefix-max skipping index vs naive overlap logic. |
| src/SonnetDB/SonnetDB.csproj | Adds Snappier dependency to the server project. |
| src/SonnetDB/Program.cs | Wires new Influx and Prometheus ingest routes. |
| src/SonnetDB/Endpoints/PrometheusRemoteWriteEndpointHandler.cs | Implements Prom RW endpoint, snappy decompress, and minimal protobuf reader. |
| src/SonnetDB/Endpoints/InfluxLineProtocolEndpointHandler.cs | Implements Influx-compatible LP endpoints with gzip + precision handling. |
| src/SonnetDB/Endpoints/EndpointIngestUtils.cs | Shared body-reading + JSON error response helper for ingest endpoints. |
| src/SonnetDB/Auth/BearerAuthMiddleware.cs | Accepts Authorization: Token <token> as an alias (Influx v2 compatibility). |
| src/SonnetDB.Core/Storage/Segments/SegmentIndex.cs | Adds prefix-max arrays and new time-range block lookup overload for skipping. |
| src/SonnetDB.Core/Storage/Segments/MultiSegmentIndex.cs | Uses new SegmentIndex.GetBlocks(seriesId, from, to) overload for pruning. |
| Directory.Packages.props | Pins Snappier version (central package management). |
| CHANGELOG.md | Documents new endpoints and the SegmentIndex skipping optimization. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+103
to
+108
| catch (Exception ex) when (ex is InvalidDataException or ArgumentException or InvalidOperationException) | ||
| { | ||
| await WriteErrorAsync(ctx, StatusCodes.Status400BadRequest, "snappy_error", | ||
| "请求体不是合法的 Snappy 块格式。").ConfigureAwait(false); | ||
| return; | ||
| } |
Comment on lines
+249
to
+268
| while (local < sampleEnd) | ||
| { | ||
| var sTag = ReadVarint(span, ref local); | ||
| int sField = (int)(sTag >> 3); | ||
| int sWire = (int)(sTag & 0x7); | ||
| switch (sField, sWire) | ||
| { | ||
| case (1, 1): // value, fixed64 | ||
| if (local + 8 > sampleEnd) throw new PrometheusProtoException("Sample.value 截断。"); | ||
| value = BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(span.Slice(local, 8))); | ||
| local += 8; | ||
| hasValue = true; | ||
| break; | ||
| case (2, 0): // timestamp, varint | ||
| timestampMs = (long)ReadVarint(span, ref local); | ||
| hasTs = true; | ||
| break; | ||
| default: | ||
| SkipField(span, ref local, sWire); | ||
| break; |
Comment on lines
+134
to
+152
| /// <summary> | ||
| /// 解析 InfluxDB 风格的 <c>precision</c> 查询参数。 | ||
| /// </summary> | ||
| /// <param name="value">原始查询字符串。</param> | ||
| /// <returns>对应的 <see cref="TimePrecision"/>;缺省返回 <see cref="TimePrecision.Nanoseconds"/>(InfluxDB 默认)。</returns> | ||
| private static TimePrecision ParsePrecision(string? value) | ||
| { | ||
| if (string.IsNullOrEmpty(value)) | ||
| return TimePrecision.Nanoseconds; | ||
| // InfluxDB v1 使用 "n" 作为纳秒别名;v2 使用 "ns"。两者兼容。 | ||
| return value switch | ||
| { | ||
| "ns" or "n" => TimePrecision.Nanoseconds, | ||
| "us" or "u" or "µs" => TimePrecision.Microseconds, | ||
| "ms" => TimePrecision.Milliseconds, | ||
| "s" => TimePrecision.Seconds, | ||
| _ => TimePrecision.Nanoseconds, | ||
| }; | ||
| } |
Comment on lines
+523
to
+524
| // v1: POST /write?db=<db>&precision=<n|u|ms|s> | ||
| // v2: POST /api/v2/write?bucket=<db>&org=<ignored>&precision=<ns|us|ms|s> |
| ## [Unreleased] | ||
|
|
||
| ### Added | ||
| - **Prometheus Remote Write 兼容入站端点**:`src/SonnetDB` 新增 `POST /api/v1/prom/write?db=<name>`,请求体为 `snappy(block) + protobuf(prometheus.WriteRequest)`,让 Prometheus / VictoriaMetrics agent / Grafana Alloy / OpenTelemetry Collector 可以无需改 URL 直接把指标写入 SonnetDB。映射规则:`__name__` label → `measurement`,其余 label → `tags`,每条 `Sample(value:double, ts:int64 ms)` 展开为一个 `Point` 的 `value:double` field;NaN 样本(Prometheus stale marker)与名称含保留字符(`, = \n \r \t "`)的 series 静默跳过;snappy / protobuf 解码失败返回 `400`,否则返回 `204 No Content`(Prometheus 协议约定)。仅 server 层引入新依赖 `Snappier 1.3.1`(纯托管 Snappy 块格式解压,AOT 友好);protobuf 解码采用手写最小 decoder(仅识别 WriteRequest/TimeSeries/Label/Sample,未知 wire 字段安全跳过),不引入 `Google.Protobuf` / `Grpc.Tools`,不改动 `src/SonnetDB.Core`。新增 13 个端到端测试覆盖 happy path + SQL 回查、多 series、NaN 跳过、缺失 `__name__`、保留字符、空 body、bad snappy / bad protobuf、缺/未知 db、ReadOnly 403、未认证 401、JSON 错误体。 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
SegmentIndex内为每个 (seriesId, fieldName) 桶与每个 seriesId 桶预计算prefixMaxTimestamp[i] = max(MaxTimestamp[0..i])GetBlocks(seriesId, fieldName, from, to):upper bound 仍按 MinTimestamp 二分;lower bound 改为对 prefixMax 二分,再仅扫描[L_first, upper)GetBlocks(seriesId, from, toInclusive)多 field 路径,并让MultiSegmentIndex.LookupCandidates(sid, from, to)转发到该重载,复用同一跳跃索引SegmentIndex.BuildFromBlocksForTesting(internal,已通过现有[InternalsVisibleTo("SonnetDB.Core.Tests")]暴露)SegmentIndexSkippingTests:单 block / 多 block / 完全包含 / 完全不重叠 / 边界相等 / 压缩重叠 / 按 series 多 field / 与朴素 O(n) 实现的 64 block × 300 query fuzz 对拍dotnet test SonnetDB.slnx:1810 + 8 + 190 = 全部 2008 个测试通过