Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/jpillora/longestcommon v0.0.0-20161227235612-adb9d91ee629
github.com/json-iterator/go v1.1.12
github.com/keboola/go-client v1.26.6
github.com/keboola/go-utils v1.0.3
github.com/keboola/go-utils v1.1.0
github.com/klauspost/compress v1.17.9
github.com/klauspost/pgzip v1.2.6
github.com/kylelemons/godebug v1.1.0
Expand Down Expand Up @@ -67,6 +67,7 @@ require (
github.com/umisama/go-regexpcache v0.0.0-20150417035358-2444a542492f
github.com/urfave/negroni v1.0.0
github.com/valyala/fasthttp v1.55.0
github.com/valyala/fastjson v1.6.4
github.com/writeas/go-strip-markdown v2.0.1+incompatible
github.com/xtaci/kcp-go/v5 v5.6.8
go.etcd.io/etcd/api/v3 v3.5.14
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ github.com/keboola/go-mockoidc v0.0.0-20240405064136-5229d2b53db6 h1:HcvX1VQkiav
github.com/keboola/go-mockoidc v0.0.0-20240405064136-5229d2b53db6/go.mod h1:eDjgYHYDJbPLBLsyZ6qRaugP0mX8vePOhZ5id1fdzJw=
github.com/keboola/go-oauth2-proxy/v7 v7.6.1-0.20240418143152-9d00aaa29562 h1:EiwSnkbGt2i6XxvjDMrWx6/bGlQjVs+yq1mDJ5b3U1U=
github.com/keboola/go-oauth2-proxy/v7 v7.6.1-0.20240418143152-9d00aaa29562/go.mod h1:uPrZkzwsuFyIPP04hIt6TG2KvWujglvkOnUUnQJyIdw=
github.com/keboola/go-utils v1.0.3 h1:8ybBfqsJC8k6Xrte6d6PBStehmMabbki2u0H6WI/cfc=
github.com/keboola/go-utils v1.0.3/go.mod h1:p+AIGpqlL7c0X+MWNOLdkAt2rMM5JXlycWwkOKmlrps=
github.com/keboola/go-utils v1.1.0 h1:2tJikVr1kESR88qeGy/Vtmendw7TXB1UrJKLfPKceSk=
github.com/keboola/go-utils v1.1.0/go.mod h1:4YVC2/V0QwgHqxtch8JAVDNVI1aINF2arJ7sh6TO1GY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
Expand Down Expand Up @@ -662,6 +662,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8=
github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM=
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
Expand Down
25 changes: 25 additions & 0 deletions internal/pkg/service/stream/mapping/recordctx/fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/valyala/fasthttp"
"github.com/valyala/fastjson"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)
Expand All @@ -27,6 +28,8 @@ type fastHTTPContext struct {
bodyStringErr error
bodyMap *orderedmap.OrderedMap
bodyMapErr error
jsonValue *fastjson.Value
jsonValueErr error
}

func FromFastHTTP(ctx context.Context, timestamp time.Time, req *fasthttp.RequestCtx) Context {
Expand Down Expand Up @@ -122,6 +125,28 @@ func (c *fastHTTPContext) BodyMap() (*orderedmap.OrderedMap, error) {
return c.bodyMap, c.bodyMapErr
}

func (c *fastHTTPContext) JSONValue(parserPool *fastjson.ParserPool) (*fastjson.Value, error) {
c.lock.Lock()
defer c.lock.Unlock()

if c.jsonValue == nil && c.jsonValueErr == nil {
if body, err := c.BodyBytes(); err != nil {
c.jsonValueErr = err
} else {
parser := parserPool.Get()
defer parserPool.Put(parser)

if jsonValue, err := parser.ParseBytes(body); err != nil {
c.jsonValueErr = errors.PrefixError(err, "cannot parse request json")
} else {
c.jsonValue = jsonValue
}
}
}

return c.jsonValue, c.jsonValueErr
}

func (c *fastHTTPContext) headersToMap() *orderedmap.OrderedMap {
out := orderedmap.New()
for _, k := range c.req.Request.Header.PeekKeys() {
Expand Down
25 changes: 25 additions & 0 deletions internal/pkg/service/stream/mapping/recordctx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/valyala/fastjson"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/ip"
Expand All @@ -29,6 +30,8 @@ type httpContext struct {
bodyBytesErr error
bodyMap *orderedmap.OrderedMap
bodyMapErr error
jsonValue *fastjson.Value
jsonValueErr error
}

func FromHTTP(timestamp time.Time, req *http.Request) Context {
Expand Down Expand Up @@ -124,6 +127,28 @@ func (c *httpContext) BodyMap() (*orderedmap.OrderedMap, error) {
return c.bodyMap, c.bodyMapErr
}

func (c *httpContext) JSONValue(parserPool *fastjson.ParserPool) (*fastjson.Value, error) {
c.lock.Lock()
defer c.lock.Unlock()

if c.jsonValue == nil && c.jsonValueErr == nil {
if body, err := c.bodyBytesWithoutLock(); err != nil {
c.jsonValueErr = err
} else {
parser := parserPool.Get()
defer parserPool.Put(parser)

if jsonValue, err := parser.ParseBytes(body); err != nil {
c.jsonValueErr = errors.PrefixError(err, "cannot parse request json")
} else {
c.jsonValue = jsonValue
}
}
}

return c.jsonValue, c.jsonValueErr
}

func (c *httpContext) bodyBytesWithoutLock() ([]byte, error) {
if c.bodyBytes == nil && c.bodyBytesErr == nil {
c.bodyBytes, c.bodyBytesErr = io.ReadAll(c.req.Body)
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/mapping/recordctx/recordctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/valyala/fastjson"
)

type Context interface {
Expand All @@ -18,4 +19,5 @@ type Context interface {
BodyString() (string, error)
BodyBytes() ([]byte, error)
BodyMap() (*orderedmap.OrderedMap, error)
JSONValue(*fastjson.ParserPool) (*fastjson.Value, error)
}
16 changes: 3 additions & 13 deletions internal/pkg/service/stream/mapping/recordctx/utils.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,27 @@
package recordctx

import (
"strings"

jsoniter "github.com/json-iterator/go"
"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/umisama/go-regexpcache"

serviceError "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/httputils"
utilsUrl "github.com/keboola/keboola-as-code/internal/pkg/utils/url"
)

// json - replacement of the standard encoding/json library, it is faster for larger responses.
var json = jsoniter.ConfigCompatibleWithStandardLibrary //nolint:gochecknoglobals

func isContentTypeJSON(t string) bool {
return regexpcache.MustCompile(`^application/([a-zA-Z0-9\.\-]+\+)?json$`).MatchString(t)
}

func isContentTypeForm(t string) bool {
return strings.HasPrefix(t, "application/x-www-form-urlencoded")
}

func parseBody(contentType string, body []byte) (data *orderedmap.OrderedMap, err error) {
// Decode
switch {
case isContentTypeForm(contentType):
case httputils.IsContentTypeForm(contentType):
data, err = utilsUrl.ParseQuery(string(body))
if err != nil {
return nil, serviceError.NewBadRequestError(errors.Errorf("invalid form data: %w", err))
}
case isContentTypeJSON(contentType):
case httputils.IsContentTypeJSON(contentType):
err = json.Unmarshal(body, &data)
if err != nil {
return nil, serviceError.NewBadRequestError(errors.Errorf("invalid JSON: %w", err))
Expand Down
123 changes: 97 additions & 26 deletions internal/pkg/service/stream/mapping/table/column/renderer.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
package column

import (
"fmt"
"strconv"
"strings"

"github.com/gofrs/uuid/v5"
"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/valyala/fastjson"

"github.com/keboola/keboola-as-code/internal/pkg/encoding/json"
jsonnetWrapper "github.com/keboola/keboola-as-code/internal/pkg/encoding/jsonnet"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/jsonnet"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/httputils"
)

type Renderer struct {
jsonnetPool *jsonnetWrapper.VMPool[recordctx.Context]
jsonnetPool *jsonnetWrapper.VMPool[recordctx.Context]
fastjsonPool *fastjson.ParserPool
}

func NewRenderer() *Renderer {
return &Renderer{
jsonnetPool: jsonnet.NewPool(),
jsonnetPool: jsonnet.NewPool(),
fastjsonPool: &fastjson.ParserPool{},
}
}

Expand All @@ -41,33 +48,13 @@ func (r *Renderer) CSVValue(c Column, ctx recordctx.Context) (string, error) {
case IP:
return ctx.ClientIP().String(), nil
case Path:
bodyMap, err := ctx.BodyMap()
if err != nil {
return "", err
}
contentType, ok := ctx.HeadersMap().GetOrNil("Content-Type").(string)

var value any

if c.Path == "" {
value = bodyMap
} else {
value, _, err = bodyMap.GetNested(c.Path)
if err != nil {
if c.DefaultValue != nil {
value = *c.DefaultValue
} else {
return "", errors.Wrapf(err, `path "%s" not found in the body`, c.Path)
}
}
if ok && httputils.IsContentTypeJSON(contentType) {
return r.jsonPathCSVValue(c, ctx)
}

if c.RawString {
if stringValue, ok := value.(string); ok {
return stringValue, nil
}
}

return json.EncodeString(value, false)
return r.mapPathCSVValue(c, ctx)
case Template:
if c.Template.Language != TemplateLanguageJsonnet {
return "", errors.Errorf(`unsupported language "%s", only "jsonnet" is supported`, c.Template.Language)
Expand All @@ -86,3 +73,87 @@ func (r *Renderer) CSVValue(c Column, ctx recordctx.Context) (string, error) {

return "", errors.Errorf("unknown column type %T", c)
}

func (r *Renderer) mapPathCSVValue(c Path, ctx recordctx.Context) (string, error) {
bodyMap, err := ctx.BodyMap()
if err != nil {
return "", err
}

var value any

if c.Path == "" {
value = bodyMap
} else {
value, _, err = bodyMap.GetNested(c.Path)
if err != nil {
if c.DefaultValue != nil {
value = *c.DefaultValue
} else {
return "", errors.Wrapf(err, `path "%s" not found in the body`, c.Path)
}
}
}

if c.RawString {
if stringValue, ok := value.(string); ok {
return stringValue, nil
}
}

return json.EncodeString(value, false)
}

func (r *Renderer) jsonPathCSVValue(c Path, ctx recordctx.Context) (string, error) {
// Get fastjson.Value, needs to be cached in recordctx as it might be used in multiple columns
value, err := ctx.JSONValue(r.fastjsonPool)
if err != nil {
return "", err
}

var resultErr error

path := orderedmap.PathFromStr(c.Path)
if len(path) > 0 {
// Transform orderedmap.Path to a slice of keys used by fastjson
keys := []string{}
for _, step := range path {
var key string

switch step := step.(type) {
case orderedmap.MapStep:
key = step.Key()
case orderedmap.SliceStep:
key = strconv.Itoa(step.Index())
}

keys = append(keys, key)
}

// Fetch the desired value from the json
if value.Exists(keys...) {
value = value.Get(keys...)
} else {
resultErr = errors.New(fmt.Sprintf(`path "%s" not found in the body`, c.Path))
}
}

if resultErr == nil {
// Return unquoted string if the value is a string and RawString is set to true.
if c.RawString && value.Type() == fastjson.TypeString {
return string(value.GetStringBytes()), nil
}

// Return the found value (json encoded)
return value.String(), nil
} else if c.DefaultValue != nil {
// An error happened while processing the path, but we have a DefaultValue to use.
if c.RawString {
return *c.DefaultValue, nil
}

return json.EncodeString(*c.DefaultValue, false)
}

return "", resultErr
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ func BenchmarkColumn_Path(b *testing.B) {

body := `{"key1":[{"key2":"val2"},{"key3":"val3"}]}`
header := http.Header{"Content-Type": []string{"application/json"}}
reqCtx := recordctx.FromHTTP(time.Now(), &http.Request{Header: header, Body: io.NopCloser(strings.NewReader(body))})
renderer := column.NewRenderer()

for i := 0; i < b.N; i++ {
// reqCtx needs to be created separately for each request, otherwise the parsed json is cached
reqCtx := recordctx.FromHTTP(time.Now(), &http.Request{Header: header, Body: io.NopCloser(strings.NewReader(body))})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reqCtx caches the parsed json in case it is needed for other columns so we need to create a new one for each CSVValue call, otherwise json parsing is cached, making the benchmark irrelevant.

val, err := renderer.CSVValue(c, reqCtx)
assert.NoError(b, err)
assert.Equal(b, `"val3"`, val)
Expand All @@ -38,10 +39,10 @@ func BenchmarkColumn_Template_Jsonnet(b *testing.B) {

body := `{"key1":[{"key2":"val2"},{"key3":"val3"}]}`
header := http.Header{"Content-Type": []string{"application/json"}}
reqCtx := recordctx.FromHTTP(time.Now(), &http.Request{Header: header, Body: io.NopCloser(strings.NewReader(body))})
renderer := column.NewRenderer()

for i := 0; i < b.N; i++ {
reqCtx := recordctx.FromHTTP(time.Now(), &http.Request{Header: header, Body: io.NopCloser(strings.NewReader(body))})
val, err := renderer.CSVValue(c, reqCtx)
assert.NoError(b, err)
assert.Equal(b, `"val3"`, val)
Expand All @@ -51,10 +52,10 @@ func BenchmarkColumn_Template_Jsonnet(b *testing.B) {
func BenchmarkColumn_UUID(b *testing.B) {
c := column.UUID{}

reqCtx := recordctx.FromHTTP(time.Now(), &http.Request{})
renderer := column.NewRenderer()

for i := 0; i < b.N; i++ {
reqCtx := recordctx.FromHTTP(time.Now(), &http.Request{})
val, err := renderer.CSVValue(c, reqCtx)
assert.NoError(b, err)
assert.Len(b, val, 36)
Expand Down
Loading