Skip to content

Commit 3510e8c

Browse files
fix(experimental/iox): throw error when iox source returns duplicate columns (#5561)
Fixes influxdata/idpe#19228
1 parent 00788d0 commit 3510e8c

File tree

2 files changed

+154
-0
lines changed

2 files changed

+154
-0
lines changed

stdlib/experimental/iox/source.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,14 @@ func (s *sqlSource) Run(ctx context.Context) {
9696
func (s *sqlSource) createSchema(schema *stdarrow.Schema) ([]flux.ColMeta, error) {
9797
fields := schema.Fields()
9898
cols := make([]flux.ColMeta, len(fields))
99+
seen := make(map[string]bool, len(fields))
100+
99101
for i, f := range fields {
102+
if seen[f.Name] {
103+
return nil, errors.Newf(codes.Invalid, "duplicate field name '%s' in schema", f.Name)
104+
}
105+
seen[f.Name] = true
106+
100107
cols[i].Label = f.Name
101108
switch id := f.Type.ID(); id {
102109
case stdarrow.INT64:
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package iox
2+
3+
import (
4+
"testing"
5+
6+
stdarrow "github.com/apache/arrow-go/v18/arrow"
7+
"github.com/google/go-cmp/cmp"
8+
"github.com/influxdata/flux"
9+
"github.com/influxdata/flux/codes"
10+
"github.com/influxdata/flux/internal/errors"
11+
)
12+
13+
func TestCreateSchema(t *testing.T) {
14+
tests := []struct {
15+
name string
16+
fields []stdarrow.Field
17+
wantErr bool
18+
errCode codes.Code
19+
errMsg string
20+
}{
21+
{
22+
name: "no duplicates",
23+
fields: []stdarrow.Field{
24+
{Name: "id", Type: &stdarrow.Int64Type{}},
25+
{Name: "name", Type: &stdarrow.StringType{}},
26+
{Name: "value", Type: &stdarrow.Float64Type{}},
27+
{Name: "active", Type: &stdarrow.BooleanType{}},
28+
{Name: "time", Type: stdarrow.FixedWidthTypes.Timestamp_ns},
29+
},
30+
wantErr: false,
31+
},
32+
{
33+
name: "duplicate field names",
34+
fields: []stdarrow.Field{
35+
{Name: "id", Type: &stdarrow.Int64Type{}},
36+
{Name: "value", Type: &stdarrow.Float64Type{}},
37+
{Name: "id", Type: &stdarrow.Int64Type{}}, // duplicate
38+
},
39+
wantErr: true,
40+
errCode: codes.Invalid,
41+
errMsg: "duplicate field name 'id' in schema",
42+
},
43+
{
44+
name: "multiple duplicates",
45+
fields: []stdarrow.Field{
46+
{Name: "id", Type: &stdarrow.Int64Type{}},
47+
{Name: "name", Type: &stdarrow.StringType{}},
48+
{Name: "id", Type: &stdarrow.Int64Type{}}, // first duplicate
49+
{Name: "name", Type: &stdarrow.StringType{}}, // second duplicate
50+
},
51+
wantErr: true,
52+
errCode: codes.Invalid,
53+
errMsg: "duplicate field name 'id' in schema", // should catch first duplicate
54+
},
55+
{
56+
name: "case sensitive field names",
57+
fields: []stdarrow.Field{
58+
{Name: "ID", Type: &stdarrow.Int64Type{}},
59+
{Name: "id", Type: &stdarrow.Int64Type{}},
60+
{Name: "Id", Type: &stdarrow.Int64Type{}},
61+
},
62+
wantErr: false,
63+
},
64+
}
65+
66+
for _, tt := range tests {
67+
t.Run(tt.name, func(t *testing.T) {
68+
schema := stdarrow.NewSchema(tt.fields, nil)
69+
70+
source := &sqlSource{}
71+
72+
cols, err := source.createSchema(schema)
73+
74+
if tt.wantErr {
75+
if err == nil {
76+
t.Fatal("createSchema() expected error but got none")
77+
}
78+
79+
// Check error code
80+
if e, ok := err.(*errors.Error); ok {
81+
if e.Code != tt.errCode {
82+
t.Fatalf("createSchema() error code = %v, want %v", e.Code, tt.errCode)
83+
}
84+
if e.Msg != tt.errMsg {
85+
t.Fatalf("createSchema() error message = %q, want %q", e.Msg, tt.errMsg)
86+
}
87+
} else {
88+
t.Fatalf("createSchema() error type = %T, want *errors.Error", err)
89+
}
90+
} else {
91+
if err != nil {
92+
t.Fatalf("createSchema() unexpected error: %v", err)
93+
}
94+
95+
// Verify columns were created correctly
96+
if len(cols) != len(tt.fields) {
97+
t.Fatalf("createSchema() returned %d columns, want %d", len(cols), len(tt.fields))
98+
}
99+
100+
wantCols := make([]flux.ColMeta, len(tt.fields))
101+
for i, field := range tt.fields {
102+
wantCols[i].Label = field.Name
103+
switch field.Type.ID() {
104+
case stdarrow.INT64:
105+
wantCols[i].Type = flux.TInt
106+
case stdarrow.FLOAT64:
107+
wantCols[i].Type = flux.TFloat
108+
case stdarrow.STRING:
109+
wantCols[i].Type = flux.TString
110+
case stdarrow.BOOL:
111+
wantCols[i].Type = flux.TBool
112+
case stdarrow.TIMESTAMP:
113+
wantCols[i].Type = flux.TTime
114+
}
115+
}
116+
117+
if !cmp.Equal(wantCols, cols) {
118+
t.Fatalf("createSchema() columns -want/+got\n%s", cmp.Diff(wantCols, cols))
119+
}
120+
}
121+
})
122+
}
123+
}
124+
125+
func TestCreateSchema_UnsupportedType(t *testing.T) {
126+
// Test unsupported arrow type
127+
fields := []stdarrow.Field{
128+
{Name: "id", Type: &stdarrow.Int64Type{}},
129+
{Name: "data", Type: &stdarrow.Float16Type{}}, // unsupported type
130+
}
131+
132+
schema := stdarrow.NewSchema(fields, nil)
133+
source := &sqlSource{}
134+
135+
_, err := source.createSchema(schema)
136+
if err == nil {
137+
t.Fatal("createSchema() expected error for unsupported type but got none")
138+
}
139+
140+
e, ok := err.(*errors.Error)
141+
if !ok {
142+
t.Fatalf("createSchema() error type = %T, want *errors.Error", err)
143+
}
144+
if e.Code != codes.Internal {
145+
t.Fatalf("createSchema() error code = %v, want %v", e.Code, codes.Internal)
146+
}
147+
}

0 commit comments

Comments
 (0)