Skip to content

Commit 1aa8b20

Browse files
committed
Revert to direct svix calls, preserve river prep work
1 parent a9ad8a5 commit 1aa8b20

File tree

7 files changed

+117
-51
lines changed

7 files changed

+117
-51
lines changed

cmd/api/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ func main() {
356356
AuthenticatorAppSecretsKMS: authenticatorAppSecretsKMS,
357357
SES: ses_,
358358
PageEncoder: pagetoken.Encoder{Secret: pageEncodingValue},
359+
SvixClient: svixClient,
359360
AuditlogStore: &auditlogStore,
360361
OIDCClient: oidcClient,
361362
RiverClient: riverClient,

internal/backend/store/organizations.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88

99
"github.com/google/uuid"
1010
"github.com/jackc/pgx/v5"
11+
"github.com/svix/svix-webhooks/go/models"
1112
auditlogv1 "github.com/tesseral-labs/tesseral/internal/auditlog/gen/tesseral/auditlog/v1"
1213
"github.com/tesseral-labs/tesseral/internal/backend/authn"
1314
backendv1 "github.com/tesseral-labs/tesseral/internal/backend/gen/tesseral/backend/v1"
1415
"github.com/tesseral-labs/tesseral/internal/backend/store/queries"
15-
"github.com/tesseral-labs/tesseral/internal/backgroundworker/webhookworker"
1616
"github.com/tesseral-labs/tesseral/internal/common/apierror"
1717
"github.com/tesseral-labs/tesseral/internal/store/idformat"
1818
"google.golang.org/protobuf/types/known/timestamppb"
@@ -508,18 +508,31 @@ func (s *Store) EnableOrganizationLogins(ctx context.Context, req *backendv1.Ena
508508
}
509509

510510
func (s *Store) sendSyncOrganizationEvent(ctx context.Context, tx pgx.Tx, qOrg queries.Organization) error {
511-
// Add the sync organization event to the background worker queue
512-
if _, err := s.riverClient.InsertTx(ctx, tx, webhookworker.Args{
513-
ProjectID: idformat.Project.Format(authn.ProjectID(ctx)),
514-
EventName: "sync.organization",
515-
Payload: map[string]any{
511+
qProjectWebhookSettings, err := s.q.GetProjectWebhookSettings(ctx, authn.ProjectID(ctx))
512+
if err != nil {
513+
// We want to ignore this error if the project does not have webhook settings
514+
if errors.Is(err, pgx.ErrNoRows) {
515+
return nil
516+
}
517+
return fmt.Errorf("get project by id: %w", err)
518+
}
519+
520+
if qProjectWebhookSettings.AppID == nil {
521+
return nil
522+
}
523+
524+
message, err := s.svixClient.Message.Create(ctx, *qProjectWebhookSettings.AppID, models.MessageIn{
525+
EventType: "sync.organization",
526+
Payload: map[string]interface{}{
516527
"type": "sync.organization",
517528
"organizationId": idformat.Organization.Format(qOrg.ID),
518529
},
519-
}, nil); err != nil {
520-
return fmt.Errorf("insert background worker args: %w", err)
530+
}, nil)
531+
if err != nil {
532+
return fmt.Errorf("create message: %w", err)
521533
}
522-
slog.InfoContext(ctx, "send_webhook_event_created", "event_type", "sync.organization", "organization_id", idformat.Organization.Format(qOrg.ID))
534+
535+
slog.InfoContext(ctx, "svix_message_created", "message_id", message.Id, "event_type", message.EventType, "organization_id", idformat.Organization.Format(qOrg.ID))
523536

524537
return nil
525538
}

internal/backend/store/users.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"log/slog"
87

98
"github.com/google/uuid"
109
"github.com/jackc/pgx/v5"
10+
"github.com/svix/svix-webhooks/go/models"
1111
auditlogv1 "github.com/tesseral-labs/tesseral/internal/auditlog/gen/tesseral/auditlog/v1"
1212
"github.com/tesseral-labs/tesseral/internal/backend/authn"
1313
backendv1 "github.com/tesseral-labs/tesseral/internal/backend/gen/tesseral/backend/v1"
1414
"github.com/tesseral-labs/tesseral/internal/backend/store/queries"
15-
"github.com/tesseral-labs/tesseral/internal/backgroundworker/webhookworker"
1615
"github.com/tesseral-labs/tesseral/internal/common/apierror"
1716
"github.com/tesseral-labs/tesseral/internal/store/idformat"
1817
"google.golang.org/protobuf/types/known/timestamppb"
@@ -327,18 +326,28 @@ func (s *Store) DeleteUser(ctx context.Context, req *backendv1.DeleteUserRequest
327326
}
328327

329328
func (s *Store) sendSyncUserEvent(ctx context.Context, tx pgx.Tx, qUser queries.User) error {
330-
// Add the sync organization event to the background worker queue
331-
if _, err := s.riverClient.InsertTx(ctx, tx, webhookworker.Args{
332-
ProjectID: idformat.Project.Format(authn.ProjectID(ctx)),
333-
EventName: "sync.user",
334-
Payload: map[string]any{
329+
qProjectWebhookSettings, err := s.q.GetProjectWebhookSettings(ctx, authn.ProjectID(ctx))
330+
if err != nil {
331+
// We want to ignore this error if the project does not have webhook settings
332+
if errors.Is(err, pgx.ErrNoRows) {
333+
return nil
334+
}
335+
return fmt.Errorf("get project by id: %w", err)
336+
}
337+
338+
if qProjectWebhookSettings.AppID == nil {
339+
return nil
340+
}
341+
342+
if _, err := s.svixClient.Message.Create(ctx, *qProjectWebhookSettings.AppID, models.MessageIn{
343+
EventType: "sync.user",
344+
Payload: map[string]interface{}{
335345
"type": "sync.user",
336346
"userId": idformat.User.Format(qUser.ID),
337347
},
338348
}, nil); err != nil {
339-
return fmt.Errorf("insert background worker args: %w", err)
349+
return fmt.Errorf("create message: %w", err)
340350
}
341-
slog.InfoContext(ctx, "send_webhook_event_created", "event_type", "sync.user", "user_id", idformat.User.Format(qUser.ID))
342351

343352
return nil
344353
}

internal/frontend/store/organizations.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"log/slog"
88

99
"github.com/jackc/pgx/v5"
10+
"github.com/svix/svix-webhooks/go/models"
1011
auditlogv1 "github.com/tesseral-labs/tesseral/internal/auditlog/gen/tesseral/auditlog/v1"
11-
"github.com/tesseral-labs/tesseral/internal/backgroundworker/webhookworker"
1212
"github.com/tesseral-labs/tesseral/internal/common/apierror"
1313
"github.com/tesseral-labs/tesseral/internal/frontend/authn"
1414
frontendv1 "github.com/tesseral-labs/tesseral/internal/frontend/gen/tesseral/frontend/v1"
@@ -200,18 +200,31 @@ func (s *Store) UpdateOrganization(ctx context.Context, req *frontendv1.UpdateOr
200200
}
201201

202202
func (s *Store) sendSyncOrganizationEvent(ctx context.Context, tx pgx.Tx, qOrg queries.Organization) error {
203-
// Add the sync organization event to the background worker queue
204-
if _, err := s.riverClient.InsertTx(ctx, tx, webhookworker.Args{
205-
ProjectID: idformat.Project.Format(authn.ProjectID(ctx)),
206-
EventName: "sync.organization",
203+
qProjectWebhookSettings, err := s.q.GetProjectWebhookSettings(ctx, authn.ProjectID(ctx))
204+
if err != nil {
205+
// We want to ignore this error if the project does not have webhook settings
206+
if errors.Is(err, pgx.ErrNoRows) {
207+
return nil
208+
}
209+
return fmt.Errorf("get project by id: %w", err)
210+
}
211+
212+
if qProjectWebhookSettings.AppID == nil {
213+
return nil
214+
}
215+
216+
message, err := s.svixClient.Message.Create(ctx, *qProjectWebhookSettings.AppID, models.MessageIn{
217+
EventType: "sync.organization",
207218
Payload: map[string]interface{}{
208219
"type": "sync.organization",
209220
"organizationId": idformat.Organization.Format(qOrg.ID),
210221
},
211-
}, nil); err != nil {
212-
return fmt.Errorf("insert background worker args: %w", err)
222+
}, nil)
223+
if err != nil {
224+
return fmt.Errorf("create message: %w", err)
213225
}
214-
slog.InfoContext(ctx, "send_webhook_event_created", "event_type", "sync.organization", "organization_id", idformat.Organization.Format(qOrg.ID))
226+
227+
slog.InfoContext(ctx, "svix_message_created", "message_id", message.Id, "event_type", message.EventType, "organization_id", idformat.Organization.Format(qOrg.ID))
215228

216229
return nil
217230
}

internal/frontend/store/store.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/jackc/pgx/v5"
1212
"github.com/jackc/pgx/v5/pgxpool"
1313
"github.com/riverqueue/river"
14+
svix "github.com/svix/svix-webhooks/go"
1415
auditlogstore "github.com/tesseral-labs/tesseral/internal/auditlog/store"
1516
"github.com/tesseral-labs/tesseral/internal/frontend/store/queries"
1617
"github.com/tesseral-labs/tesseral/internal/hibp"
@@ -30,9 +31,10 @@ type Store struct {
3031
ses *sesv2.Client
3132
pageEncoder pagetoken.Encoder
3233
q *queries.Queries
33-
auditlogStore *auditlogstore.Store
34-
oidc *oidcclient.Client
35-
riverClient *river.Client[pgx.Tx]
34+
svixClient *svix.Svix
35+
auditlogStore *auditlogstore.Store
36+
oidc *oidcclient.Client
37+
riverClient *river.Client[pgx.Tx]
3638
}
3739

3840
type NewStoreParams struct {
@@ -43,9 +45,10 @@ type NewStoreParams struct {
4345
AuthenticatorAppSecretsKMS *kms.KMS
4446
SES *sesv2.Client
4547
PageEncoder pagetoken.Encoder
46-
AuditlogStore *auditlogstore.Store
47-
OIDCClient *oidcclient.Client
48-
RiverClient *river.Client[pgx.Tx]
48+
SvixClient *svix.Svix
49+
AuditlogStore *auditlogstore.Store
50+
OIDCClient *oidcclient.Client
51+
RiverClient *river.Client[pgx.Tx]
4952
}
5053

5154
func New(p NewStoreParams) *Store {
@@ -61,9 +64,10 @@ func New(p NewStoreParams) *Store {
6164
ses: p.SES,
6265
pageEncoder: p.PageEncoder,
6366
q: queries.New(p.DB),
64-
auditlogStore: p.AuditlogStore,
65-
oidc: p.OIDCClient,
66-
riverClient: p.RiverClient,
67+
svixClient: p.SvixClient,
68+
auditlogStore: p.AuditlogStore,
69+
oidc: p.OIDCClient,
70+
riverClient: p.RiverClient,
6771
}
6872

6973
return store

internal/frontend/store/users.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88

99
"github.com/google/uuid"
1010
"github.com/jackc/pgx/v5"
11+
"github.com/svix/svix-webhooks/go/models"
1112
auditlogv1 "github.com/tesseral-labs/tesseral/internal/auditlog/gen/tesseral/auditlog/v1"
12-
"github.com/tesseral-labs/tesseral/internal/backgroundworker/webhookworker"
1313
"github.com/tesseral-labs/tesseral/internal/bcryptcost"
1414
"github.com/tesseral-labs/tesseral/internal/common/apierror"
1515
"github.com/tesseral-labs/tesseral/internal/frontend/authn"
@@ -271,18 +271,31 @@ func (s *Store) DeleteUser(ctx context.Context, req *frontendv1.DeleteUserReques
271271
}
272272

273273
func (s *Store) sendSyncUserEvent(ctx context.Context, tx pgx.Tx, qUser queries.User) error {
274-
// Add the sync organization event to the background worker queue
275-
if _, err := s.riverClient.InsertTx(ctx, tx, webhookworker.Args{
276-
ProjectID: idformat.Project.Format(authn.ProjectID(ctx)),
277-
EventName: "sync.user",
274+
qProjectWebhookSettings, err := s.q.GetProjectWebhookSettings(ctx, authn.ProjectID(ctx))
275+
if err != nil {
276+
// We want to ignore this error if the project does not have webhook settings
277+
if errors.Is(err, pgx.ErrNoRows) {
278+
return nil
279+
}
280+
return fmt.Errorf("get project by id: %w", err)
281+
}
282+
283+
if qProjectWebhookSettings.AppID == nil {
284+
return nil
285+
}
286+
287+
message, err := s.svixClient.Message.Create(ctx, *qProjectWebhookSettings.AppID, models.MessageIn{
288+
EventType: "sync.user",
278289
Payload: map[string]interface{}{
279290
"type": "sync.user",
280291
"userId": idformat.User.Format(qUser.ID),
281292
},
282-
}, nil); err != nil {
283-
return fmt.Errorf("insert background worker args: %w", err)
293+
}, nil)
294+
if err != nil {
295+
return fmt.Errorf("create message: %w", err)
284296
}
285-
slog.InfoContext(ctx, "send_webhook_event_created", "event_type", "sync.user", "user_id", idformat.User.Format(qUser.ID))
297+
298+
slog.InfoContext(ctx, "svix_message_created", "message_id", message.Id, "event_type", message.EventType, "user_id", idformat.User.Format(qUser.ID))
286299

287300
return nil
288301
}

internal/intermediate/store/exchange.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010

1111
"github.com/google/uuid"
1212
"github.com/jackc/pgx/v5"
13+
"github.com/svix/svix-webhooks/go/models"
1314
auditlogv1 "github.com/tesseral-labs/tesseral/internal/auditlog/gen/tesseral/auditlog/v1"
14-
"github.com/tesseral-labs/tesseral/internal/backgroundworker/webhookworker"
1515
"github.com/tesseral-labs/tesseral/internal/common/apierror"
1616
"github.com/tesseral-labs/tesseral/internal/intermediate/authn"
1717
intermediatev1 "github.com/tesseral-labs/tesseral/internal/intermediate/gen/tesseral/intermediate/v1"
@@ -310,18 +310,31 @@ func (s *Store) ExchangeIntermediateSessionForSession(ctx context.Context, req *
310310
}
311311

312312
func (s *Store) sendSyncUserEvent(ctx context.Context, tx pgx.Tx, qUser queries.User) error {
313-
// Add the sync organization event to the background worker queue
314-
if _, err := s.riverClient.InsertTx(ctx, tx, webhookworker.Args{
315-
ProjectID: idformat.Project.Format(authn.ProjectID(ctx)),
316-
EventName: "sync.user",
313+
qProjectWebhookSettings, err := s.q.GetProjectWebhookSettings(ctx, authn.ProjectID(ctx))
314+
if err != nil {
315+
// We want to ignore this error if the project does not have webhook settings
316+
if errors.Is(err, pgx.ErrNoRows) {
317+
return nil
318+
}
319+
return fmt.Errorf("get project by id: %w", err)
320+
}
321+
322+
if qProjectWebhookSettings.AppID == nil {
323+
return nil
324+
}
325+
326+
message, err := s.svixClient.Message.Create(ctx, *qProjectWebhookSettings.AppID, models.MessageIn{
327+
EventType: "sync.user",
317328
Payload: map[string]interface{}{
318329
"type": "sync.user",
319330
"userId": idformat.User.Format(qUser.ID),
320331
},
321-
}, nil); err != nil {
322-
return fmt.Errorf("insert background worker args: %w", err)
332+
}, nil)
333+
if err != nil {
334+
return fmt.Errorf("create message: %w", err)
323335
}
324-
slog.InfoContext(ctx, "send_webhook_event_created", "event_type", "sync.user", "user_id", idformat.User.Format(qUser.ID))
336+
337+
slog.InfoContext(ctx, "svix_message_created", "message_id", message.Id, "event_type", message.EventType, "user_id", idformat.User.Format(qUser.ID))
325338

326339
return nil
327340
}

0 commit comments

Comments
 (0)