Skip to content

Commit 1217a5a

Browse files
committed
feat: support API Key last used at
1 parent d8fa117 commit 1217a5a

File tree

11 files changed

+750
-90
lines changed

11 files changed

+750
-90
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
CREATE TABLE IF NOT EXISTS `api_key_last_used_info` (
2+
`api_key_id` VARCHAR(255) NOT NULL,
3+
`environment_id` VARCHAR(255) NOT NULL,
4+
`last_used_at` TIMESTAMP NULL DEFAULT NULL,
5+
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
6+
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
7+
UNIQUE KEY `unique_api_key_id` (`api_key_id`)
8+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

migration/mysql/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:sbwKrn+UjmZchqTeIaRqq00fxno8oq6vba/UVm6GN8A=
1+
h1:9njHfvWuxG4qTXhYog8wtbY3P6R4RSRvzBYwVkeBOMU=
22
20240626022133_initialization.sql h1:reSmqMhqnsrdIdPU2ezv/PXSL0COlRFX4gQA4U3/wMo=
33
20240708065726_update_audit_log_table.sql h1:fi8Xxw4WfSlHDyvq2Ni/8JUiZW8z/0qWWyWm6jFdUy8=
44
20240815043128_update_auto_ops_rule_table.sql h1:IKSW9W/XO6SWAYl5WPLJSg6KdsfcZ3rfQhIrf7aOnYc=
@@ -32,3 +32,4 @@ h1:sbwKrn+UjmZchqTeIaRqq00fxno8oq6vba/UVm6GN8A=
3232
20250411093510_update_audit_log_table.sql h1:KDbGMLbeKhtUc833pAvKOiw9JdM4rXSUVc34TOywvc8=
3333
20250618071930_create_team_table.sql h1:Gc0+XyVk0zM+wWdWfqtkox2WQEnzftNX3FR7qpEAioU=
3434
20250711131250_update_indexes.sql h1:ghjrhD9mDtCHaHL7HbiAkTjmRHp6EZ3eerVjb8cSZCA=
35+
20250929104850_create_api_key_last_used_info_table.sql h1:Zsc16hqNKOw8mbl1UZojUjfHQ0ySM7JTV7uLT+c8WE0=
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2025 The Bucketeer Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package domain
16+
17+
import (
18+
proto "github.com/bucketeer-io/bucketeer/v2/proto/account"
19+
)
20+
21+
type APIKeyLastUsedInfo struct {
22+
*proto.APIKeyLastUsedInfo
23+
}
24+
25+
func NewAPIKeyLastUsedInfo(
26+
apiKeyID string,
27+
lastUsedAt int64,
28+
environmentID string,
29+
) *APIKeyLastUsedInfo {
30+
return &APIKeyLastUsedInfo{
31+
APIKeyLastUsedInfo: &proto.APIKeyLastUsedInfo{
32+
ApiKeyId: apiKeyID,
33+
LastUsedAt: lastUsedAt,
34+
EnvironmentId: environmentID,
35+
CreatedAt: lastUsedAt,
36+
},
37+
}
38+
}
39+
40+
func (f *APIKeyLastUsedInfo) UsedAt(v int64) {
41+
if f.LastUsedAt < v {
42+
f.LastUsedAt = v
43+
}
44+
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
// Copyright 2025 The Bucketeer Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processor
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"sync"
22+
"time"
23+
24+
"go.uber.org/zap"
25+
"google.golang.org/protobuf/proto"
26+
"google.golang.org/protobuf/types/known/anypb"
27+
28+
"github.com/bucketeer-io/bucketeer/v2/pkg/account/domain"
29+
"github.com/bucketeer-io/bucketeer/v2/pkg/pubsub/puller"
30+
"github.com/bucketeer-io/bucketeer/v2/pkg/pubsub/puller/codes"
31+
"github.com/bucketeer-io/bucketeer/v2/pkg/storage/v2/mysql"
32+
"github.com/bucketeer-io/bucketeer/v2/pkg/subscriber"
33+
eventproto "github.com/bucketeer-io/bucketeer/v2/proto/event/client"
34+
)
35+
36+
type APIKeyLastUsedInfoWriterConfig struct {
37+
FlushSize int `json:"flushSize"`
38+
FlushInterval int `json:"flushInterval"`
39+
WriteCacheInterval int `json:"writeCacheInterval"`
40+
UserAttributeKeyTTL int `json:"userAttributeKeyTtl"`
41+
}
42+
43+
type apikeyLastUsedInfoCache map[string]*domain.APIKeyLastUsedInfo
44+
45+
type envAPIKeyLastUsedInfoCache map[string]apikeyLastUsedInfoCache
46+
47+
type apiKeyUsageEventMap map[string]*eventproto.APIKeyUsageEvent
48+
49+
type envAPIKeyUsageEventMap map[string]apiKeyUsageEventMap
50+
51+
type apikeyLastUsedInfoWriter struct {
52+
config APIKeyLastUsedInfoWriterConfig
53+
apikeyLastUsedInfoCacher envAPIKeyLastUsedInfoCache
54+
55+
envLastUsedCacheMutex sync.Mutex
56+
57+
mysqlClient mysql.Client
58+
logger *zap.Logger
59+
}
60+
61+
func NewAPIKeyLastUsedInfoWriter(
62+
config interface{},
63+
mysqlClient mysql.Client,
64+
logger *zap.Logger,
65+
) (subscriber.PubSubProcessor, error) {
66+
apikeyLastUsedInfWriterConfig, ok := config.(map[string]interface{})
67+
if !ok {
68+
logger.Error("apikeyLastUsedInfoWriter: invalid config")
69+
return nil, ErrAPIKeyLastUsedInfoWriterInvalidConfig
70+
}
71+
configBytes, err := json.Marshal(apikeyLastUsedInfWriterConfig)
72+
if err != nil {
73+
logger.Error("apikeyLastUsedInfoWriter: failed to marshal config", zap.Error(err))
74+
return nil, ErrAPIKeyLastUsedInfoWriterInvalidConfig
75+
}
76+
var apikeyLastUsedInfoWriterConfig APIKeyLastUsedInfoWriterConfig
77+
err = json.Unmarshal(configBytes, &apikeyLastUsedInfoWriterConfig)
78+
if err != nil {
79+
logger.Error("apikeyLastUsedInfoWriter: failed to unmarshal config", zap.Error(err))
80+
return nil, ErrAPIKeyLastUsedInfoWriterInvalidConfig
81+
}
82+
w := &apikeyLastUsedInfoWriter{
83+
config: apikeyLastUsedInfoWriterConfig,
84+
mysqlClient: mysqlClient,
85+
logger: logger,
86+
}
87+
88+
return w, nil
89+
}
90+
91+
func (a *apikeyLastUsedInfoWriter) Process(ctx context.Context, msgChan <-chan *puller.Message) error {
92+
batch := make(map[string]*puller.Message)
93+
ticket := time.NewTicker(time.Duration(a.config.UserAttributeKeyTTL) * time.Second)
94+
defer ticket.Stop()
95+
96+
resetBatch := func() {
97+
for _, msg := range batch {
98+
msg.Ack()
99+
subscriberHandledCounter.WithLabelValues(subscriberAPIKeyLastUsedInfo, codes.OK.String()).Inc()
100+
}
101+
batch = make(map[string]*puller.Message)
102+
}
103+
104+
for {
105+
select {
106+
case msg, ok := <-msgChan:
107+
if !ok {
108+
a.logger.Error("Failed to pull message")
109+
return nil
110+
}
111+
subscriberReceivedCounter.WithLabelValues(subscriberAPIKeyLastUsedInfo).Inc()
112+
id := msg.Attributes["id"]
113+
if id == "" {
114+
a.logger.Error("apikeyLastUsedInfoWriter: id is empty")
115+
subscriberHandledCounter.WithLabelValues(subscriberAPIKeyLastUsedInfo, codes.MissingID.String()).Inc()
116+
continue
117+
}
118+
if previous, ok := batch[id]; ok {
119+
subscriberHandledCounter.WithLabelValues(subscriberAPIKeyLastUsedInfo, codes.MissingID.String()).Inc()
120+
previous.Ack()
121+
}
122+
batch[id] = msg
123+
if len(batch) < a.config.FlushSize {
124+
continue
125+
}
126+
envEvents := a.extractEvents(batch)
127+
a.cacheAPIKeyLastUsedInfoPerEnv(envEvents)
128+
resetBatch()
129+
case <-ticket.C:
130+
envEvents := a.extractEvents(batch)
131+
a.cacheAPIKeyLastUsedInfoPerEnv(envEvents)
132+
resetBatch()
133+
case <-ctx.Done():
134+
// Nack the messages to be redelivered
135+
for _, msg := range batch {
136+
msg.Nack()
137+
}
138+
a.logger.Debug("All the left messages were Nack successfully before shutting down",
139+
zap.Int("batchSize", len(batch)))
140+
return nil
141+
}
142+
}
143+
}
144+
145+
func (a *apikeyLastUsedInfoWriter) cacheAPIKeyLastUsedInfoPerEnv(envEvents envAPIKeyUsageEventMap) {
146+
for environmentID, events := range envEvents {
147+
for _, event := range events {
148+
a.cacheEnvAPIKeyLastUsedInfo(event, environmentID)
149+
}
150+
a.logger.Debug("Update cache API key last used info",
151+
zap.String("environmentID", environmentID),
152+
zap.Int("cacheSize", len(a.apikeyLastUsedInfoCacher[environmentID])),
153+
zap.Int("eventSize", len(events)),
154+
)
155+
}
156+
}
157+
158+
func (a *apikeyLastUsedInfoWriter) cacheEnvAPIKeyLastUsedInfo(
159+
event *eventproto.APIKeyUsageEvent,
160+
environmentID string,
161+
) {
162+
a.envLastUsedCacheMutex.Lock()
163+
defer a.envLastUsedCacheMutex.Unlock()
164+
if cache, ok := a.apikeyLastUsedInfoCacher[environmentID]; ok {
165+
if info, ok := cache[event.ApiKeyId]; ok {
166+
info.UsedAt(event.Timestamp)
167+
return
168+
}
169+
cache[event.ApiKeyId] = domain.NewAPIKeyLastUsedInfo(event.ApiKeyId, event.Timestamp, environmentID)
170+
return
171+
}
172+
cache := apikeyLastUsedInfoCache{}
173+
cache[event.ApiKeyId] = domain.NewAPIKeyLastUsedInfo(event.ApiKeyId, event.Timestamp, environmentID)
174+
a.apikeyLastUsedInfoCacher[environmentID] = cache
175+
}
176+
177+
func (a *apikeyLastUsedInfoWriter) extractEvents(messages map[string]*puller.Message) envAPIKeyUsageEventMap {
178+
envEvents := envAPIKeyUsageEventMap{}
179+
handleBadMessage := func(m *puller.Message, err error) {
180+
m.Ack()
181+
a.logger.Error("Bad proto message",
182+
zap.Error(err),
183+
zap.String("messageID", m.ID),
184+
zap.ByteString("data", m.Data),
185+
zap.Any("attributes", m.Attributes),
186+
)
187+
subscriberHandledCounter.WithLabelValues(subscriberAPIKeyLastUsedInfo, codes.BadMessage.String()).Inc()
188+
}
189+
for _, msg := range messages {
190+
// check if data is empty
191+
if len(msg.Data) == 0 {
192+
handleBadMessage(msg, fmt.Errorf("message data is empty"))
193+
continue
194+
}
195+
event := &eventproto.Event{}
196+
if err := proto.Unmarshal(msg.Data, event); err != nil {
197+
handleBadMessage(msg, err)
198+
continue
199+
}
200+
innerEvent := &eventproto.APIKeyUsageEvent{}
201+
if err := anypb.UnmarshalTo(event.Event, innerEvent, proto.UnmarshalOptions{}); err != nil {
202+
handleBadMessage(msg, err)
203+
continue
204+
}
205+
if innerEvents, ok := envEvents[event.EnvironmentId]; ok {
206+
innerEvents[event.Id] = innerEvent
207+
continue
208+
}
209+
envEvents[event.EnvironmentId] = apiKeyUsageEventMap{event.Id: innerEvent}
210+
}
211+
return envEvents
212+
}

pkg/subscriber/processor/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ var (
2727
ErrEvaluationCountInvalidConfig = errors.New("evaluation count: invalid config")
2828
ErrEventsDWHPersisterInvalidConfig = errors.New("eventpersister: invalid config")
2929
ErrEventsOPSPersisterInvalidConfig = errors.New("eventpersister: invalid config")
30+
ErrAPIKeyLastUsedInfoWriterInvalidConfig = errors.New("apikeyLastUsedInfoWriter: invalid config")
3031
ErrExperimentNotFound = errors.New("eventpersister: experiment not found")
3132
ErrReasonNil = errors.New("eventpersister: reason is nil")
3233
ErrEvaluationsAreEmpty = errors.New("eventpersister: evaluations are empty")

pkg/subscriber/processor/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
subscriberSegmentUser = "SegmentUser"
3434
subscriberUserEvent = "UserEvent"
3535
subscriberDemoOrganizationEvent = "DemoOrganizationEvent"
36+
subscriberAPIKeyLastUsedInfo = "APIKeyLastUsedInfo"
3637
)
3738

3839
const (

0 commit comments

Comments
 (0)