From 50952dec3ff880bc031cd47628ebdf98f3fe6898 Mon Sep 17 00:00:00 2001 From: Devin Binnie <52460000+devinbinnie@users.noreply.github.com> Date: Wed, 3 Jun 2026 10:39:53 -0400 Subject: [PATCH 1/2] [MM-68648] Implement GetForGroup to get fields in the Property System, add caching for fields (#36836) * [MM-68648] Implement GetForGroup to get fields in the Property System, add caching for fields * Re-add CRUD functions for the cache to invalidate when updates happen --- .../channels/app/properties/property_field.go | 9 ++ server/channels/app/property_field.go | 12 +++ server/channels/app/property_field_test.go | 76 +++++++++++++++ .../channels/store/localcachelayer/layer.go | 23 +++++ .../store/localcachelayer/main_test.go | 8 ++ .../localcachelayer/property_field_layer.go | 96 +++++++++++++++++++ .../property_field_layer_test.go | 94 ++++++++++++++++++ .../channels/store/retrylayer/retrylayer.go | 86 ++++++++++++----- .../store/sqlstore/property_field_store.go | 13 +++ server/channels/store/store.go | 1 + .../storetest/mocks/PropertyFieldStore.go | 30 ++++++ .../channels/store/timerlayer/timerlayer.go | 66 +++++++++---- server/enterprise/metrics/metrics.go | 1 + server/i18n/en.json | 4 + server/public/model/cluster_message.go | 1 + 15 files changed, 477 insertions(+), 43 deletions(-) create mode 100644 server/channels/store/localcachelayer/property_field_layer.go create mode 100644 server/channels/store/localcachelayer/property_field_layer_test.go diff --git a/server/channels/app/properties/property_field.go b/server/channels/app/properties/property_field.go index f4649600cfe..1b614c66350 100644 --- a/server/channels/app/properties/property_field.go +++ b/server/channels/app/properties/property_field.go @@ -487,6 +487,15 @@ func (ps *PropertyService) GetPropertyFields(rctx request.CTX, groupID string, i return ps.runPostGetPropertyFields(rctx, fields) } +func (ps *PropertyService) GetPropertyFieldsForGroup(rctx request.CTX, groupID string) ([]*model.PropertyField, error) { + fields, err := ps.fieldStore.GetForGroup(context.Background(), groupID) + if err != nil { + return nil, fmt.Errorf("GetPropertyFieldsForGroup: %w", err) + } + + return ps.runPostGetPropertyFields(rctx, fields) +} + func (ps *PropertyService) GetPropertyFieldByName(rctx request.CTX, groupID, targetID, name string) (*model.PropertyField, error) { field, err := ps.getPropertyFieldByName(groupID, targetID, name) if err != nil { diff --git a/server/channels/app/property_field.go b/server/channels/app/property_field.go index 2d749194857..dbf27dbecc1 100644 --- a/server/channels/app/property_field.go +++ b/server/channels/app/property_field.go @@ -144,6 +144,18 @@ func (a *App) SearchPropertyFields(rctx request.CTX, groupID string, opts model. return fields, nil } +// GetPropertyFieldsForGroup retrieves all active property fields for a group. +func (a *App) GetPropertyFieldsForGroup(rctx request.CTX, groupID string) ([]*model.PropertyField, *model.AppError) { + fields, err := a.Srv().propertyService.GetPropertyFieldsForGroup(rctx, groupID) + if err != nil { + if appErr := mapPropertyServiceError("GetPropertyFieldsForGroup", err); appErr != nil { + return nil, appErr + } + return nil, model.NewAppError("GetPropertyFieldsForGroup", "app.property_field.get_for_group.app_error", nil, "", http.StatusInternalServerError).Wrap(err) + } + return fields, nil +} + // CountPropertyFieldsForGroup counts property fields for a group. func (a *App) CountPropertyFieldsForGroup(rctx request.CTX, groupID string, includeDeleted bool) (int64, *model.AppError) { var count int64 diff --git a/server/channels/app/property_field_test.go b/server/channels/app/property_field_test.go index c4e3fe23391..61df0e98c5e 100644 --- a/server/channels/app/property_field_test.go +++ b/server/channels/app/property_field_test.go @@ -628,6 +628,82 @@ func TestDeletePropertyField(t *testing.T) { }) } +func TestPropertyFieldCacheInvalidation(t *testing.T) { + mainHelper.Parallel(t) + th := Setup(t).InitBasic(t) + + newField := func(groupID, name string) *model.PropertyField { + return &model.PropertyField{ + GroupID: groupID, + Name: name, + Type: model.PropertyFieldTypeText, + ObjectType: model.PropertyFieldObjectTypeUser, + TargetType: string(model.PropertyFieldTargetLevelSystem), + } + } + + t.Run("create invalidates the cached group listing", func(t *testing.T) { + groupID := registerTestPropertyGroup(t, th) + + _, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "First Field"), false, "") + require.Nil(t, appErr) + + // Populate the cache for this group. + fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + + _, appErr = th.App.CreatePropertyField(th.Context, newField(groupID, "Second Field"), false, "") + require.Nil(t, appErr) + + // A stale cache would still return a single field here. + fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + assert.Len(t, fields, 2) + }) + + t.Run("update invalidates the cached group listing", func(t *testing.T) { + groupID := registerTestPropertyGroup(t, th) + + created, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "Original Name"), false, "") + require.Nil(t, appErr) + + fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + require.Equal(t, "Original Name", fields[0].Name) + + created.Name = "Updated Name" + _, _, appErr = th.App.UpdatePropertyField(th.Context, groupID, created, false, "") + require.Nil(t, appErr) + + // A stale cache would still return the original name here. + fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + assert.Equal(t, "Updated Name", fields[0].Name) + }) + + t.Run("delete invalidates the cached group listing", func(t *testing.T) { + groupID := registerTestPropertyGroup(t, th) + + created, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "Field to Delete"), false, "") + require.Nil(t, appErr) + + fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + + appErr = th.App.DeletePropertyField(th.Context, groupID, created.ID, false, "") + require.Nil(t, appErr) + + // A stale cache would still return the deleted field here. + fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + assert.Empty(t, fields) + }) +} + func TestPropertyFieldBroadcastParams(t *testing.T) { rctx := request.TestContext(t) diff --git a/server/channels/store/localcachelayer/layer.go b/server/channels/store/localcachelayer/layer.go index 5798b85b3fa..305caa08a01 100644 --- a/server/channels/store/localcachelayer/layer.go +++ b/server/channels/store/localcachelayer/layer.go @@ -85,6 +85,9 @@ const ( SessionAttributeCacheSize = model.SessionCacheSize SessionAttributeCacheSec = 30 + + PropertyFieldCacheSize = 100 + PropertyFieldCacheSec = 30 * 60 ) var clearCacheMessageData = []byte("") @@ -158,6 +161,9 @@ type LocalCacheStore struct { sessionAttribute LocalCacheSessionAttributeStore sessionAttributeCache cache.Cache + + propertyField LocalCachePropertyFieldStore + propertyFieldCache cache.Cache } func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterface, cluster einterfaces.ClusterInterface, cacheProvider cache.Provider, logger mlog.LoggerIFace) (localCacheStore LocalCacheStore, err error) { @@ -473,6 +479,17 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf } localCacheStore.sessionAttribute = LocalCacheSessionAttributeStore{SessionAttributeStore: baseStore.SessionAttribute(), rootStore: &localCacheStore} + // Property Fields + if localCacheStore.propertyFieldCache, err = cacheProvider.NewCache(&cache.CacheOptions{ + Size: PropertyFieldCacheSize, + Name: "PropertyField", + DefaultExpiry: PropertyFieldCacheSec * time.Second, + InvalidateClusterEvent: model.ClusterEventInvalidateCacheForPropertyFields, + }); err != nil { + return + } + localCacheStore.propertyField = LocalCachePropertyFieldStore{PropertyFieldStore: baseStore.PropertyField(), rootStore: &localCacheStore} + if cluster != nil { cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForReactions, localCacheStore.reaction.handleClusterInvalidateReaction) cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForRoles, localCacheStore.role.handleClusterInvalidateRole) @@ -503,6 +520,7 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForReadReceipts, localCacheStore.readReceipt.handleClusterInvalidateReadReceipts) cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForTemporaryPosts, localCacheStore.temporaryPost.handleClusterInvalidateTemporaryPosts) cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForSessionAttributes, localCacheStore.sessionAttribute.handleClusterInvalidateSessionAttributes) + cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForPropertyFields, localCacheStore.propertyField.handleClusterInvalidatePropertyField) } return } @@ -571,6 +589,10 @@ func (s LocalCacheStore) SessionAttribute() store.SessionAttributeStore { return &s.sessionAttribute } +func (s LocalCacheStore) PropertyField() store.PropertyFieldStore { + return s.propertyField +} + func (s LocalCacheStore) DropAllTables() { s.Invalidate() s.Store.DropAllTables() @@ -710,6 +732,7 @@ func (s *LocalCacheStore) Invalidate() { s.doClearCacheCluster(s.readReceiptPostReadersCache) s.doClearCacheCluster(s.temporaryPostCache) s.doClearCacheCluster(s.sessionAttributeCache) + s.doClearCacheCluster(s.propertyFieldCache) } // allocateCacheTargets is used to fill target value types diff --git a/server/channels/store/localcachelayer/main_test.go b/server/channels/store/localcachelayer/main_test.go index e8f5cacaf52..cd9bcaa45dc 100644 --- a/server/channels/store/localcachelayer/main_test.go +++ b/server/channels/store/localcachelayer/main_test.go @@ -207,6 +207,14 @@ func getMockStore(t *testing.T) *mocks.Store { mockSessionAttributeStore := mocks.SessionAttributeStore{} mockStore.On("SessionAttribute").Return(&mockSessionAttributeStore) + fakeField := model.PropertyField{ID: "field-id", GroupID: "group-id", Name: "field-name"} + mockPropertyFieldStore := mocks.PropertyFieldStore{} + mockPropertyFieldStore.On("GetForGroup", context.Background(), "group-id").Return([]*model.PropertyField{&fakeField}, nil) + mockPropertyFieldStore.On("Create", &fakeField).Return(&fakeField, nil) + mockPropertyFieldStore.On("Update", "group-id", []*model.PropertyField{&fakeField}, map[string]int64(nil)).Return([]*model.PropertyField{&fakeField}, nil) + mockPropertyFieldStore.On("Delete", "group-id", "field-id").Return(nil) + mockStore.On("PropertyField").Return(&mockPropertyFieldStore) + mockReadReceiptStore := &mocks.ReadReceiptStore{} mockStore.On("ReadReceipt").Return(mockReadReceiptStore) diff --git a/server/channels/store/localcachelayer/property_field_layer.go b/server/channels/store/localcachelayer/property_field_layer.go new file mode 100644 index 00000000000..3305eb67e4d --- /dev/null +++ b/server/channels/store/localcachelayer/property_field_layer.go @@ -0,0 +1,96 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package localcachelayer + +import ( + "bytes" + "context" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/v8/channels/store" +) + +func (s LocalCachePropertyFieldStore) Create(field *model.PropertyField) (*model.PropertyField, error) { + created, err := s.PropertyFieldStore.Create(field) + if err != nil { + return nil, err + } + + s.InvalidateFieldsForGroup(created.GroupID) + return created, nil +} + +func (s LocalCachePropertyFieldStore) Update(groupID string, fields []*model.PropertyField, expectedUpdateAts map[string]int64) ([]*model.PropertyField, error) { + updated, err := s.PropertyFieldStore.Update(groupID, fields, expectedUpdateAts) + if err != nil { + return nil, err + } + + // The returned slice includes both the requested fields and any linked + // fields the store propagated to. Invalidate each distinct group so all + // affected GetForGroup caches are cleared. + invalidated := make(map[string]bool, len(updated)) + for _, field := range updated { + if invalidated[field.GroupID] { + continue + } + invalidated[field.GroupID] = true + s.InvalidateFieldsForGroup(field.GroupID) + } + return updated, nil +} + +func (s LocalCachePropertyFieldStore) Delete(groupID string, id string) error { + if err := s.PropertyFieldStore.Delete(groupID, id); err != nil { + return err + } + + s.InvalidateFieldsForGroup(groupID) + return nil +} + +type LocalCachePropertyFieldStore struct { + store.PropertyFieldStore + rootStore *LocalCacheStore +} + +func (s *LocalCachePropertyFieldStore) handleClusterInvalidatePropertyField(msg *model.ClusterMessage) { + if bytes.Equal(msg.Data, clearCacheMessageData) { + if err := s.rootStore.propertyFieldCache.Purge(); err != nil { + s.rootStore.logger.Warn("failed to purge property field cache", mlog.Err(err)) + } + } else if err := s.rootStore.propertyFieldCache.Remove(string(msg.Data)); err != nil { + s.rootStore.logger.Warn("failed to remove property field cache entry", mlog.Err(err)) + } +} + +func (s LocalCachePropertyFieldStore) InvalidateFieldsForGroup(groupID string) { + s.rootStore.doInvalidateCacheCluster(s.rootStore.propertyFieldCache, groupID, nil) + if s.rootStore.metrics != nil { + s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.propertyFieldCache.Name()) + } +} + +func (s *LocalCachePropertyFieldStore) getFieldsForGroupFromCache(groupID string) ([]*model.PropertyField, bool) { + var fields []*model.PropertyField + if err := s.rootStore.doStandardReadCache(s.rootStore.propertyFieldCache, groupID, &fields); err == nil { + return fields, true + } + return nil, false +} + +func (s LocalCachePropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + if fields, ok := s.getFieldsForGroupFromCache(groupID); ok { + return fields, nil + } + + fields, err := s.PropertyFieldStore.GetForGroup(ctx, groupID) + if err != nil { + return nil, err + } + + s.rootStore.doStandardAddToCache(s.rootStore.propertyFieldCache, groupID, fields) + return fields, nil +} diff --git a/server/channels/store/localcachelayer/property_field_layer_test.go b/server/channels/store/localcachelayer/property_field_layer_test.go new file mode 100644 index 00000000000..e17dbf12254 --- /dev/null +++ b/server/channels/store/localcachelayer/property_field_layer_test.go @@ -0,0 +1,94 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package localcachelayer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/v8/channels/store/storetest/mocks" +) + +func TestPropertyFieldStoreCache(t *testing.T) { + groupID := "group-id" + fakeField := model.PropertyField{ID: "field-id", GroupID: groupID, Name: "field-name"} + fakeFields := []*model.PropertyField{&fakeField} + logger := mlog.CreateConsoleTestLogger(t) + + t.Run("GetForGroup cached on second call", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + fields, err := cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + assert.Equal(t, fakeFields, fields) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + fields, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + assert.Equal(t, fakeFields, fields) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + }) + + t.Run("Create invalidates the group cache", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + _, err = cachedStore.PropertyField().Create(&fakeField) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 2) + }) + + t.Run("Update invalidates the group cache", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + _, err = cachedStore.PropertyField().Update(groupID, fakeFields, nil) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 2) + }) + + t.Run("Delete invalidates the group cache", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + err = cachedStore.PropertyField().Delete(groupID, fakeField.ID) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 2) + }) +} diff --git a/server/channels/store/retrylayer/retrylayer.go b/server/channels/store/retrylayer/retrylayer.go index 78c43d7e765..aa566b29604 100644 --- a/server/channels/store/retrylayer/retrylayer.go +++ b/server/channels/store/retrylayer/retrylayer.go @@ -10488,6 +10488,27 @@ func (s *RetryLayerPropertyFieldStore) GetFieldByName(ctx context.Context, group } +func (s *RetryLayerPropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + + tries := 0 + for { + result, err := s.PropertyFieldStore.GetForGroup(ctx, groupID) + if err == nil { + return result, nil + } + if !isRepeatableError(err) { + return result, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerPropertyFieldStore) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) { tries := 0 @@ -17895,32 +17916,32 @@ func (s *RetryLayerUserAccessTokenStore) Delete(tokenID string) error { } -func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) { +func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error { tries := 0 for { - result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs) + err := s.UserAccessTokenStore.DeleteAllForUser(userID) if err == nil { - return result, nil + return nil } if !isRepeatableError(err) { - return result, err + return err } tries++ if tries >= 3 { err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") - return result, err + return err } timepkg.Sleep(100 * timepkg.Millisecond) } } -func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) + result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs) if err == nil { return result, nil } @@ -17937,32 +17958,32 @@ func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit in } -func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error { +func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { tries := 0 for { - err := s.UserAccessTokenStore.DeleteAllForUser(userID) + result, err := s.UserAccessTokenStore.Get(tokenID) if err == nil { - return nil + return result, nil } if !isRepeatableError(err) { - return err + return result, err } tries++ if tries >= 3 { err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") - return err + return result, err } timepkg.Sleep(100 * timepkg.Millisecond) } } -func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.Get(tokenID) + result, err := s.UserAccessTokenStore.GetAll(offset, limit) if err == nil { return result, nil } @@ -17979,11 +18000,11 @@ func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessT } -func (s *RetryLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetAll(offset, limit) + result, err := s.UserAccessTokenStore.GetByToken(tokenString) if err == nil { return result, nil } @@ -18000,11 +18021,11 @@ func (s *RetryLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model } -func (s *RetryLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetByToken(tokenString) + result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) if err == nil { return result, nil } @@ -18021,11 +18042,11 @@ func (s *RetryLayerUserAccessTokenStore) GetByToken(tokenString string) (*model. } -func (s *RetryLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) + result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) if err == nil { return result, nil } @@ -18852,6 +18873,27 @@ func (s *RetryLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook) } +func (s *RetryLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error { + + tries := 0 + for { + err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed) + if err == nil { + return nil + } + if !isRepeatableError(err) { + return err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) { tries := 0 @@ -18901,10 +18943,6 @@ func (s *RetryLayer) TotalSearchDbConnections() int { return s.Store.TotalSearchDbConnections() } -func (s *RetryLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { - return s.Store.GetDiagnostics(ctx) -} - func (s *RetryLayer) UnlockFromMaster() { s.Store.UnlockFromMaster() } diff --git a/server/channels/store/sqlstore/property_field_store.go b/server/channels/store/sqlstore/property_field_store.go index db8d6ad4210..30f038bed46 100644 --- a/server/channels/store/sqlstore/property_field_store.go +++ b/server/channels/store/sqlstore/property_field_store.go @@ -167,6 +167,19 @@ func (s *SqlPropertyFieldStore) CountForTarget(groupID, targetType, targetID str return count, nil } +func (s *SqlPropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + builder := s.tableSelectQuery. + Where(sq.Eq{"GroupID": groupID}). + Where(sq.Eq{"DeleteAt": 0}) + + fields := []*model.PropertyField{} + if err := s.DBXFromContext(ctx).SelectBuilder(&fields, builder); err != nil { + return nil, errors.Wrap(err, "property_field_get_for_group_query") + } + + return fields, nil +} + func (s *SqlPropertyFieldStore) SearchPropertyFields(opts model.PropertyFieldSearchOpts) ([]*model.PropertyField, error) { if err := opts.Cursor.IsValid(); err != nil { return nil, fmt.Errorf("cursor is invalid: %w", err) diff --git a/server/channels/store/store.go b/server/channels/store/store.go index 6b48690dfc6..70d6771b9ef 100644 --- a/server/channels/store/store.go +++ b/server/channels/store/store.go @@ -1173,6 +1173,7 @@ type PropertyFieldStore interface { Get(ctx context.Context, groupID, id string) (*model.PropertyField, error) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) GetFieldByName(ctx context.Context, groupID, targetID, name string) (*model.PropertyField, error) + GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) CountForGroup(groupID string, includeDeleted bool) (int64, error) CountForGroupObjectType(groupID, objectType string, includeDeleted bool) (int64, error) CountForTarget(groupID, targetType, targetID string, includeDeleted bool) (int64, error) diff --git a/server/channels/store/storetest/mocks/PropertyFieldStore.go b/server/channels/store/storetest/mocks/PropertyFieldStore.go index e8f8ca7568f..25320b92852 100644 --- a/server/channels/store/storetest/mocks/PropertyFieldStore.go +++ b/server/channels/store/storetest/mocks/PropertyFieldStore.go @@ -264,6 +264,36 @@ func (_m *PropertyFieldStore) GetFieldByName(ctx context.Context, groupID string return r0, r1 } +// GetForGroup provides a mock function with given fields: ctx, groupID +func (_m *PropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + ret := _m.Called(ctx, groupID) + + if len(ret) == 0 { + panic("no return value specified for GetForGroup") + } + + var r0 []*model.PropertyField + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]*model.PropertyField, error)); ok { + return rf(ctx, groupID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []*model.PropertyField); ok { + r0 = rf(ctx, groupID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.PropertyField) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, groupID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetMany provides a mock function with given fields: ctx, groupID, ids func (_m *PropertyFieldStore) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) { ret := _m.Called(ctx, groupID, ids) diff --git a/server/channels/store/timerlayer/timerlayer.go b/server/channels/store/timerlayer/timerlayer.go index b3e8da36bfe..4cb08ea6983 100644 --- a/server/channels/store/timerlayer/timerlayer.go +++ b/server/channels/store/timerlayer/timerlayer.go @@ -8362,6 +8362,22 @@ func (s *TimerLayerPropertyFieldStore) GetFieldByName(ctx context.Context, group return result, err } +func (s *TimerLayerPropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + start := time.Now() + + result, err := s.PropertyFieldStore.GetForGroup(ctx, groupID) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("PropertyFieldStore.GetForGroup", success, elapsed) + } + return result, err +} + func (s *TimerLayerPropertyFieldStore) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) { start := time.Now() @@ -14177,10 +14193,10 @@ func (s *TimerLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, return result, err } -func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) + result, err := s.UserAccessTokenStore.Get(tokenID) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14188,15 +14204,15 @@ func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit in if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.Get", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.Get(tokenID) + result, err := s.UserAccessTokenStore.GetAll(offset, limit) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14204,15 +14220,15 @@ func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessT if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.Get", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetAll", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetAll(offset, limit) + result, err := s.UserAccessTokenStore.GetByToken(tokenString) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14220,15 +14236,15 @@ func (s *TimerLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetAll", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByToken", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetByToken(tokenString) + result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14236,15 +14252,15 @@ func (s *TimerLayerUserAccessTokenStore) GetByToken(tokenString string) (*model. if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByToken", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByUser", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) + result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14252,7 +14268,7 @@ func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perP if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByUser", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed) } return result, err } @@ -14895,6 +14911,22 @@ func (s *TimerLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook) return result, err } +func (s *TimerLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error { + start := time.Now() + + err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("WebhookStore.UpdateIncomingLastUsed", success, elapsed) + } + return err +} + func (s *TimerLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) { start := time.Now() @@ -14939,10 +14971,6 @@ func (s *TimerLayer) TotalSearchDbConnections() int { return s.Store.TotalSearchDbConnections() } -func (s *TimerLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { - return s.Store.GetDiagnostics(ctx) -} - func (s *TimerLayer) UnlockFromMaster() { s.Store.UnlockFromMaster() } diff --git a/server/enterprise/metrics/metrics.go b/server/enterprise/metrics/metrics.go index d5be5ea08c6..a4e7e1ade34 100644 --- a/server/enterprise/metrics/metrics.go +++ b/server/enterprise/metrics/metrics.go @@ -537,6 +537,7 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf model.ClusterEventInvalidateCacheForTeams, model.ClusterEventInvalidateCacheForContentFlagging, model.ClusterEventInvalidateCacheForSessionAttributes, + model.ClusterEventInvalidateCacheForPropertyFields, model.ClusterEventClearSessionCacheForAllUsers, model.ClusterEventInstallPlugin, model.ClusterEventRemovePlugin, diff --git a/server/i18n/en.json b/server/i18n/en.json index 647d0666555..32d54ef9d7b 100644 --- a/server/i18n/en.json +++ b/server/i18n/en.json @@ -8364,6 +8364,10 @@ "id": "app.property_field.get_by_name.app_error", "translation": "Unable to get property field by name." }, + { + "id": "app.property_field.get_for_group.app_error", + "translation": "Unable to get property fields for group." + }, { "id": "app.property_field.get_many.app_error", "translation": "Unable to get property fields." diff --git a/server/public/model/cluster_message.go b/server/public/model/cluster_message.go index 16439303fce..48253d62be6 100644 --- a/server/public/model/cluster_message.go +++ b/server/public/model/cluster_message.go @@ -44,6 +44,7 @@ const ( ClusterEventInvalidateCacheForTeams ClusterEvent = "inv_teams" ClusterEventInvalidateCacheForContentFlagging ClusterEvent = "inv_content_flagging" ClusterEventInvalidateCacheForSessionAttributes ClusterEvent = "inv_session_attributes" + ClusterEventInvalidateCacheForPropertyFields ClusterEvent = "inv_property_fields" ClusterEventInvalidateCacheForAutoTranslation ClusterEvent = "inv_autotranslation" ClusterEventInvalidateCacheForReadReceipts ClusterEvent = "inv_read_receipts" ClusterEventInvalidateCacheForTemporaryPosts ClusterEvent = "inv_temporary_posts" From fb8cfbaef7617f8a4dd427a601cb34f2e36d7a4c Mon Sep 17 00:00:00 2001 From: "cursor[bot]" <206951365+cursor[bot]@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:15:45 -0400 Subject: [PATCH 2/2] Fix flaky TestSharedChannelPostMetadataSync (#36862) * Fix flaky TestSharedChannelPostMetadataSync STEP 6 incorrectly required Cluster A to receive sync traffic after a resync trigger. When echo prevention suppresses unchanged acknowledgement payloads, no message arrives and the Eventually timeout fails. Wait for pending sync tasks, assert the DB still has exactly one acknowledgement, and only validate sync payload duplicates when traffic is received. Tests-only change. Verified with `go test -run '^TestSharedChannelPostMetadataSync$' -race -count=50` locally. Co-authored-by: mattermost-code * Assert sync payload ack count outside muA lock Copy the matching post under muA before calling require.Len so a failed assertion cannot leave the mutex locked and hang teardown. Co-authored-by: mattermost-code * Retrigger CI after Playwright infra flake Co-authored-by: mattermost-code --------- Co-authored-by: Cursor Agent Co-authored-by: mattermost-code --- .../api4/shared_channel_metadata_test.go | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/server/channels/api4/shared_channel_metadata_test.go b/server/channels/api4/shared_channel_metadata_test.go index 7d3d36f7e34..d5447dd0084 100644 --- a/server/channels/api4/shared_channel_metadata_test.go +++ b/server/channels/api4/shared_channel_metadata_test.go @@ -683,17 +683,28 @@ func TestSharedChannelPostMetadataSync(t *testing.T) { // Trigger another sync to ensure no duplicates are created service.NotifyChannelChanged(testChannel.Id) - // Verify acknowledgement count remains 1 (no duplicates) + // Echo prevention may suppress resending unchanged acknowledgements, so wait + // for sync work to finish instead of requiring Cluster A to receive traffic. require.Eventually(t, func() bool { - muA.Lock() - defer muA.Unlock() - for _, post := range syncedPostsServerA { - if post.Id == postIdToTrack && post.Metadata != nil && post.Metadata.Acknowledgements != nil { - return len(post.Metadata.Acknowledgements) == 1 - } + return !service.HasPendingTasksForTesting() + }, 10*time.Second, 100*time.Millisecond, "Sync tasks should complete after resync trigger") + + finalAcks, appErr := th.App.GetAcknowledgementsForPost(postIdToTrack) + require.Nil(t, appErr) + require.Len(t, finalAcks, 1, "Should maintain single acknowledgement after resync") + + muA.Lock() + var serverAResyncPost *model.Post + for _, post := range syncedPostsServerA { + if post.Id == postIdToTrack && post.Metadata != nil && post.Metadata.Acknowledgements != nil { + serverAResyncPost = post + break } - return len(syncedPostsServerA) > 0 - }, 3*time.Second, 100*time.Millisecond, "Should maintain single acknowledgement after resync") + } + muA.Unlock() + if serverAResyncPost != nil { + require.Len(t, serverAResyncPost.Metadata.Acknowledgements, 1, "Sync payload should not contain duplicate acknowledgements") + } t.Logf("✅ Cross-cluster acknowledgement flow completed successfully:") t.Logf(" 1. Server A created post with ack request: %s", postIdToTrack)