diff --git a/server/channels/api4/shared_channel_metadata_test.go b/server/channels/api4/shared_channel_metadata_test.go index 7d3d36f7e34..d5447dd0084 100644 --- a/server/channels/api4/shared_channel_metadata_test.go +++ b/server/channels/api4/shared_channel_metadata_test.go @@ -683,17 +683,28 @@ func TestSharedChannelPostMetadataSync(t *testing.T) { // Trigger another sync to ensure no duplicates are created service.NotifyChannelChanged(testChannel.Id) - // Verify acknowledgement count remains 1 (no duplicates) + // Echo prevention may suppress resending unchanged acknowledgements, so wait + // for sync work to finish instead of requiring Cluster A to receive traffic. require.Eventually(t, func() bool { - muA.Lock() - defer muA.Unlock() - for _, post := range syncedPostsServerA { - if post.Id == postIdToTrack && post.Metadata != nil && post.Metadata.Acknowledgements != nil { - return len(post.Metadata.Acknowledgements) == 1 - } + return !service.HasPendingTasksForTesting() + }, 10*time.Second, 100*time.Millisecond, "Sync tasks should complete after resync trigger") + + finalAcks, appErr := th.App.GetAcknowledgementsForPost(postIdToTrack) + require.Nil(t, appErr) + require.Len(t, finalAcks, 1, "Should maintain single acknowledgement after resync") + + muA.Lock() + var serverAResyncPost *model.Post + for _, post := range syncedPostsServerA { + if post.Id == postIdToTrack && post.Metadata != nil && post.Metadata.Acknowledgements != nil { + serverAResyncPost = post + break } - return len(syncedPostsServerA) > 0 - }, 3*time.Second, 100*time.Millisecond, "Should maintain single acknowledgement after resync") + } + muA.Unlock() + if serverAResyncPost != nil { + require.Len(t, serverAResyncPost.Metadata.Acknowledgements, 1, "Sync payload should not contain duplicate acknowledgements") + } t.Logf("✅ Cross-cluster acknowledgement flow completed successfully:") t.Logf(" 1. Server A created post with ack request: %s", postIdToTrack) diff --git a/server/channels/app/properties/property_field.go b/server/channels/app/properties/property_field.go index f4649600cfe..1b614c66350 100644 --- a/server/channels/app/properties/property_field.go +++ b/server/channels/app/properties/property_field.go @@ -487,6 +487,15 @@ func (ps *PropertyService) GetPropertyFields(rctx request.CTX, groupID string, i return ps.runPostGetPropertyFields(rctx, fields) } +func (ps *PropertyService) GetPropertyFieldsForGroup(rctx request.CTX, groupID string) ([]*model.PropertyField, error) { + fields, err := ps.fieldStore.GetForGroup(context.Background(), groupID) + if err != nil { + return nil, fmt.Errorf("GetPropertyFieldsForGroup: %w", err) + } + + return ps.runPostGetPropertyFields(rctx, fields) +} + func (ps *PropertyService) GetPropertyFieldByName(rctx request.CTX, groupID, targetID, name string) (*model.PropertyField, error) { field, err := ps.getPropertyFieldByName(groupID, targetID, name) if err != nil { diff --git a/server/channels/app/property_field.go b/server/channels/app/property_field.go index 2d749194857..dbf27dbecc1 100644 --- a/server/channels/app/property_field.go +++ b/server/channels/app/property_field.go @@ -144,6 +144,18 @@ func (a *App) SearchPropertyFields(rctx request.CTX, groupID string, opts model. return fields, nil } +// GetPropertyFieldsForGroup retrieves all active property fields for a group. +func (a *App) GetPropertyFieldsForGroup(rctx request.CTX, groupID string) ([]*model.PropertyField, *model.AppError) { + fields, err := a.Srv().propertyService.GetPropertyFieldsForGroup(rctx, groupID) + if err != nil { + if appErr := mapPropertyServiceError("GetPropertyFieldsForGroup", err); appErr != nil { + return nil, appErr + } + return nil, model.NewAppError("GetPropertyFieldsForGroup", "app.property_field.get_for_group.app_error", nil, "", http.StatusInternalServerError).Wrap(err) + } + return fields, nil +} + // CountPropertyFieldsForGroup counts property fields for a group. func (a *App) CountPropertyFieldsForGroup(rctx request.CTX, groupID string, includeDeleted bool) (int64, *model.AppError) { var count int64 diff --git a/server/channels/app/property_field_test.go b/server/channels/app/property_field_test.go index c4e3fe23391..61df0e98c5e 100644 --- a/server/channels/app/property_field_test.go +++ b/server/channels/app/property_field_test.go @@ -628,6 +628,82 @@ func TestDeletePropertyField(t *testing.T) { }) } +func TestPropertyFieldCacheInvalidation(t *testing.T) { + mainHelper.Parallel(t) + th := Setup(t).InitBasic(t) + + newField := func(groupID, name string) *model.PropertyField { + return &model.PropertyField{ + GroupID: groupID, + Name: name, + Type: model.PropertyFieldTypeText, + ObjectType: model.PropertyFieldObjectTypeUser, + TargetType: string(model.PropertyFieldTargetLevelSystem), + } + } + + t.Run("create invalidates the cached group listing", func(t *testing.T) { + groupID := registerTestPropertyGroup(t, th) + + _, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "First Field"), false, "") + require.Nil(t, appErr) + + // Populate the cache for this group. + fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + + _, appErr = th.App.CreatePropertyField(th.Context, newField(groupID, "Second Field"), false, "") + require.Nil(t, appErr) + + // A stale cache would still return a single field here. + fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + assert.Len(t, fields, 2) + }) + + t.Run("update invalidates the cached group listing", func(t *testing.T) { + groupID := registerTestPropertyGroup(t, th) + + created, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "Original Name"), false, "") + require.Nil(t, appErr) + + fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + require.Equal(t, "Original Name", fields[0].Name) + + created.Name = "Updated Name" + _, _, appErr = th.App.UpdatePropertyField(th.Context, groupID, created, false, "") + require.Nil(t, appErr) + + // A stale cache would still return the original name here. + fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + assert.Equal(t, "Updated Name", fields[0].Name) + }) + + t.Run("delete invalidates the cached group listing", func(t *testing.T) { + groupID := registerTestPropertyGroup(t, th) + + created, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "Field to Delete"), false, "") + require.Nil(t, appErr) + + fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + require.Len(t, fields, 1) + + appErr = th.App.DeletePropertyField(th.Context, groupID, created.ID, false, "") + require.Nil(t, appErr) + + // A stale cache would still return the deleted field here. + fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID) + require.Nil(t, appErr) + assert.Empty(t, fields) + }) +} + func TestPropertyFieldBroadcastParams(t *testing.T) { rctx := request.TestContext(t) diff --git a/server/channels/store/localcachelayer/layer.go b/server/channels/store/localcachelayer/layer.go index 5798b85b3fa..305caa08a01 100644 --- a/server/channels/store/localcachelayer/layer.go +++ b/server/channels/store/localcachelayer/layer.go @@ -85,6 +85,9 @@ const ( SessionAttributeCacheSize = model.SessionCacheSize SessionAttributeCacheSec = 30 + + PropertyFieldCacheSize = 100 + PropertyFieldCacheSec = 30 * 60 ) var clearCacheMessageData = []byte("") @@ -158,6 +161,9 @@ type LocalCacheStore struct { sessionAttribute LocalCacheSessionAttributeStore sessionAttributeCache cache.Cache + + propertyField LocalCachePropertyFieldStore + propertyFieldCache cache.Cache } func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterface, cluster einterfaces.ClusterInterface, cacheProvider cache.Provider, logger mlog.LoggerIFace) (localCacheStore LocalCacheStore, err error) { @@ -473,6 +479,17 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf } localCacheStore.sessionAttribute = LocalCacheSessionAttributeStore{SessionAttributeStore: baseStore.SessionAttribute(), rootStore: &localCacheStore} + // Property Fields + if localCacheStore.propertyFieldCache, err = cacheProvider.NewCache(&cache.CacheOptions{ + Size: PropertyFieldCacheSize, + Name: "PropertyField", + DefaultExpiry: PropertyFieldCacheSec * time.Second, + InvalidateClusterEvent: model.ClusterEventInvalidateCacheForPropertyFields, + }); err != nil { + return + } + localCacheStore.propertyField = LocalCachePropertyFieldStore{PropertyFieldStore: baseStore.PropertyField(), rootStore: &localCacheStore} + if cluster != nil { cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForReactions, localCacheStore.reaction.handleClusterInvalidateReaction) cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForRoles, localCacheStore.role.handleClusterInvalidateRole) @@ -503,6 +520,7 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForReadReceipts, localCacheStore.readReceipt.handleClusterInvalidateReadReceipts) cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForTemporaryPosts, localCacheStore.temporaryPost.handleClusterInvalidateTemporaryPosts) cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForSessionAttributes, localCacheStore.sessionAttribute.handleClusterInvalidateSessionAttributes) + cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForPropertyFields, localCacheStore.propertyField.handleClusterInvalidatePropertyField) } return } @@ -571,6 +589,10 @@ func (s LocalCacheStore) SessionAttribute() store.SessionAttributeStore { return &s.sessionAttribute } +func (s LocalCacheStore) PropertyField() store.PropertyFieldStore { + return s.propertyField +} + func (s LocalCacheStore) DropAllTables() { s.Invalidate() s.Store.DropAllTables() @@ -710,6 +732,7 @@ func (s *LocalCacheStore) Invalidate() { s.doClearCacheCluster(s.readReceiptPostReadersCache) s.doClearCacheCluster(s.temporaryPostCache) s.doClearCacheCluster(s.sessionAttributeCache) + s.doClearCacheCluster(s.propertyFieldCache) } // allocateCacheTargets is used to fill target value types diff --git a/server/channels/store/localcachelayer/main_test.go b/server/channels/store/localcachelayer/main_test.go index e8f5cacaf52..cd9bcaa45dc 100644 --- a/server/channels/store/localcachelayer/main_test.go +++ b/server/channels/store/localcachelayer/main_test.go @@ -207,6 +207,14 @@ func getMockStore(t *testing.T) *mocks.Store { mockSessionAttributeStore := mocks.SessionAttributeStore{} mockStore.On("SessionAttribute").Return(&mockSessionAttributeStore) + fakeField := model.PropertyField{ID: "field-id", GroupID: "group-id", Name: "field-name"} + mockPropertyFieldStore := mocks.PropertyFieldStore{} + mockPropertyFieldStore.On("GetForGroup", context.Background(), "group-id").Return([]*model.PropertyField{&fakeField}, nil) + mockPropertyFieldStore.On("Create", &fakeField).Return(&fakeField, nil) + mockPropertyFieldStore.On("Update", "group-id", []*model.PropertyField{&fakeField}, map[string]int64(nil)).Return([]*model.PropertyField{&fakeField}, nil) + mockPropertyFieldStore.On("Delete", "group-id", "field-id").Return(nil) + mockStore.On("PropertyField").Return(&mockPropertyFieldStore) + mockReadReceiptStore := &mocks.ReadReceiptStore{} mockStore.On("ReadReceipt").Return(mockReadReceiptStore) diff --git a/server/channels/store/localcachelayer/property_field_layer.go b/server/channels/store/localcachelayer/property_field_layer.go new file mode 100644 index 00000000000..3305eb67e4d --- /dev/null +++ b/server/channels/store/localcachelayer/property_field_layer.go @@ -0,0 +1,96 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package localcachelayer + +import ( + "bytes" + "context" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/v8/channels/store" +) + +func (s LocalCachePropertyFieldStore) Create(field *model.PropertyField) (*model.PropertyField, error) { + created, err := s.PropertyFieldStore.Create(field) + if err != nil { + return nil, err + } + + s.InvalidateFieldsForGroup(created.GroupID) + return created, nil +} + +func (s LocalCachePropertyFieldStore) Update(groupID string, fields []*model.PropertyField, expectedUpdateAts map[string]int64) ([]*model.PropertyField, error) { + updated, err := s.PropertyFieldStore.Update(groupID, fields, expectedUpdateAts) + if err != nil { + return nil, err + } + + // The returned slice includes both the requested fields and any linked + // fields the store propagated to. Invalidate each distinct group so all + // affected GetForGroup caches are cleared. + invalidated := make(map[string]bool, len(updated)) + for _, field := range updated { + if invalidated[field.GroupID] { + continue + } + invalidated[field.GroupID] = true + s.InvalidateFieldsForGroup(field.GroupID) + } + return updated, nil +} + +func (s LocalCachePropertyFieldStore) Delete(groupID string, id string) error { + if err := s.PropertyFieldStore.Delete(groupID, id); err != nil { + return err + } + + s.InvalidateFieldsForGroup(groupID) + return nil +} + +type LocalCachePropertyFieldStore struct { + store.PropertyFieldStore + rootStore *LocalCacheStore +} + +func (s *LocalCachePropertyFieldStore) handleClusterInvalidatePropertyField(msg *model.ClusterMessage) { + if bytes.Equal(msg.Data, clearCacheMessageData) { + if err := s.rootStore.propertyFieldCache.Purge(); err != nil { + s.rootStore.logger.Warn("failed to purge property field cache", mlog.Err(err)) + } + } else if err := s.rootStore.propertyFieldCache.Remove(string(msg.Data)); err != nil { + s.rootStore.logger.Warn("failed to remove property field cache entry", mlog.Err(err)) + } +} + +func (s LocalCachePropertyFieldStore) InvalidateFieldsForGroup(groupID string) { + s.rootStore.doInvalidateCacheCluster(s.rootStore.propertyFieldCache, groupID, nil) + if s.rootStore.metrics != nil { + s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.propertyFieldCache.Name()) + } +} + +func (s *LocalCachePropertyFieldStore) getFieldsForGroupFromCache(groupID string) ([]*model.PropertyField, bool) { + var fields []*model.PropertyField + if err := s.rootStore.doStandardReadCache(s.rootStore.propertyFieldCache, groupID, &fields); err == nil { + return fields, true + } + return nil, false +} + +func (s LocalCachePropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + if fields, ok := s.getFieldsForGroupFromCache(groupID); ok { + return fields, nil + } + + fields, err := s.PropertyFieldStore.GetForGroup(ctx, groupID) + if err != nil { + return nil, err + } + + s.rootStore.doStandardAddToCache(s.rootStore.propertyFieldCache, groupID, fields) + return fields, nil +} diff --git a/server/channels/store/localcachelayer/property_field_layer_test.go b/server/channels/store/localcachelayer/property_field_layer_test.go new file mode 100644 index 00000000000..e17dbf12254 --- /dev/null +++ b/server/channels/store/localcachelayer/property_field_layer_test.go @@ -0,0 +1,94 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package localcachelayer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/v8/channels/store/storetest/mocks" +) + +func TestPropertyFieldStoreCache(t *testing.T) { + groupID := "group-id" + fakeField := model.PropertyField{ID: "field-id", GroupID: groupID, Name: "field-name"} + fakeFields := []*model.PropertyField{&fakeField} + logger := mlog.CreateConsoleTestLogger(t) + + t.Run("GetForGroup cached on second call", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + fields, err := cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + assert.Equal(t, fakeFields, fields) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + fields, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + assert.Equal(t, fakeFields, fields) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + }) + + t.Run("Create invalidates the group cache", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + _, err = cachedStore.PropertyField().Create(&fakeField) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 2) + }) + + t.Run("Update invalidates the group cache", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + _, err = cachedStore.PropertyField().Update(groupID, fakeFields, nil) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 2) + }) + + t.Run("Delete invalidates the group cache", func(t *testing.T) { + mockStore := getMockStore(t) + mockCacheProvider := getMockCacheProvider() + cachedStore, err := NewLocalCacheLayer(mockStore, nil, nil, mockCacheProvider, logger) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 1) + + err = cachedStore.PropertyField().Delete(groupID, fakeField.ID) + require.NoError(t, err) + + _, err = cachedStore.PropertyField().GetForGroup(context.Background(), groupID) + require.NoError(t, err) + mockStore.PropertyField().(*mocks.PropertyFieldStore).AssertNumberOfCalls(t, "GetForGroup", 2) + }) +} diff --git a/server/channels/store/retrylayer/retrylayer.go b/server/channels/store/retrylayer/retrylayer.go index 78c43d7e765..aa566b29604 100644 --- a/server/channels/store/retrylayer/retrylayer.go +++ b/server/channels/store/retrylayer/retrylayer.go @@ -10488,6 +10488,27 @@ func (s *RetryLayerPropertyFieldStore) GetFieldByName(ctx context.Context, group } +func (s *RetryLayerPropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + + tries := 0 + for { + result, err := s.PropertyFieldStore.GetForGroup(ctx, groupID) + if err == nil { + return result, nil + } + if !isRepeatableError(err) { + return result, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerPropertyFieldStore) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) { tries := 0 @@ -17895,32 +17916,32 @@ func (s *RetryLayerUserAccessTokenStore) Delete(tokenID string) error { } -func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) { +func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error { tries := 0 for { - result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs) + err := s.UserAccessTokenStore.DeleteAllForUser(userID) if err == nil { - return result, nil + return nil } if !isRepeatableError(err) { - return result, err + return err } tries++ if tries >= 3 { err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") - return result, err + return err } timepkg.Sleep(100 * timepkg.Millisecond) } } -func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) + result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs) if err == nil { return result, nil } @@ -17937,32 +17958,32 @@ func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit in } -func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error { +func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { tries := 0 for { - err := s.UserAccessTokenStore.DeleteAllForUser(userID) + result, err := s.UserAccessTokenStore.Get(tokenID) if err == nil { - return nil + return result, nil } if !isRepeatableError(err) { - return err + return result, err } tries++ if tries >= 3 { err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") - return err + return result, err } timepkg.Sleep(100 * timepkg.Millisecond) } } -func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.Get(tokenID) + result, err := s.UserAccessTokenStore.GetAll(offset, limit) if err == nil { return result, nil } @@ -17979,11 +18000,11 @@ func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessT } -func (s *RetryLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetAll(offset, limit) + result, err := s.UserAccessTokenStore.GetByToken(tokenString) if err == nil { return result, nil } @@ -18000,11 +18021,11 @@ func (s *RetryLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model } -func (s *RetryLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetByToken(tokenString) + result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) if err == nil { return result, nil } @@ -18021,11 +18042,11 @@ func (s *RetryLayerUserAccessTokenStore) GetByToken(tokenString string) (*model. } -func (s *RetryLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { +func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { tries := 0 for { - result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) + result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) if err == nil { return result, nil } @@ -18852,6 +18873,27 @@ func (s *RetryLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook) } +func (s *RetryLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error { + + tries := 0 + for { + err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed) + if err == nil { + return nil + } + if !isRepeatableError(err) { + return err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) { tries := 0 @@ -18901,10 +18943,6 @@ func (s *RetryLayer) TotalSearchDbConnections() int { return s.Store.TotalSearchDbConnections() } -func (s *RetryLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { - return s.Store.GetDiagnostics(ctx) -} - func (s *RetryLayer) UnlockFromMaster() { s.Store.UnlockFromMaster() } diff --git a/server/channels/store/sqlstore/property_field_store.go b/server/channels/store/sqlstore/property_field_store.go index db8d6ad4210..30f038bed46 100644 --- a/server/channels/store/sqlstore/property_field_store.go +++ b/server/channels/store/sqlstore/property_field_store.go @@ -167,6 +167,19 @@ func (s *SqlPropertyFieldStore) CountForTarget(groupID, targetType, targetID str return count, nil } +func (s *SqlPropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + builder := s.tableSelectQuery. + Where(sq.Eq{"GroupID": groupID}). + Where(sq.Eq{"DeleteAt": 0}) + + fields := []*model.PropertyField{} + if err := s.DBXFromContext(ctx).SelectBuilder(&fields, builder); err != nil { + return nil, errors.Wrap(err, "property_field_get_for_group_query") + } + + return fields, nil +} + func (s *SqlPropertyFieldStore) SearchPropertyFields(opts model.PropertyFieldSearchOpts) ([]*model.PropertyField, error) { if err := opts.Cursor.IsValid(); err != nil { return nil, fmt.Errorf("cursor is invalid: %w", err) diff --git a/server/channels/store/store.go b/server/channels/store/store.go index 6b48690dfc6..70d6771b9ef 100644 --- a/server/channels/store/store.go +++ b/server/channels/store/store.go @@ -1173,6 +1173,7 @@ type PropertyFieldStore interface { Get(ctx context.Context, groupID, id string) (*model.PropertyField, error) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) GetFieldByName(ctx context.Context, groupID, targetID, name string) (*model.PropertyField, error) + GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) CountForGroup(groupID string, includeDeleted bool) (int64, error) CountForGroupObjectType(groupID, objectType string, includeDeleted bool) (int64, error) CountForTarget(groupID, targetType, targetID string, includeDeleted bool) (int64, error) diff --git a/server/channels/store/storetest/mocks/PropertyFieldStore.go b/server/channels/store/storetest/mocks/PropertyFieldStore.go index e8f8ca7568f..25320b92852 100644 --- a/server/channels/store/storetest/mocks/PropertyFieldStore.go +++ b/server/channels/store/storetest/mocks/PropertyFieldStore.go @@ -264,6 +264,36 @@ func (_m *PropertyFieldStore) GetFieldByName(ctx context.Context, groupID string return r0, r1 } +// GetForGroup provides a mock function with given fields: ctx, groupID +func (_m *PropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + ret := _m.Called(ctx, groupID) + + if len(ret) == 0 { + panic("no return value specified for GetForGroup") + } + + var r0 []*model.PropertyField + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]*model.PropertyField, error)); ok { + return rf(ctx, groupID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []*model.PropertyField); ok { + r0 = rf(ctx, groupID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.PropertyField) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, groupID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetMany provides a mock function with given fields: ctx, groupID, ids func (_m *PropertyFieldStore) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) { ret := _m.Called(ctx, groupID, ids) diff --git a/server/channels/store/timerlayer/timerlayer.go b/server/channels/store/timerlayer/timerlayer.go index b3e8da36bfe..4cb08ea6983 100644 --- a/server/channels/store/timerlayer/timerlayer.go +++ b/server/channels/store/timerlayer/timerlayer.go @@ -8362,6 +8362,22 @@ func (s *TimerLayerPropertyFieldStore) GetFieldByName(ctx context.Context, group return result, err } +func (s *TimerLayerPropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) { + start := time.Now() + + result, err := s.PropertyFieldStore.GetForGroup(ctx, groupID) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("PropertyFieldStore.GetForGroup", success, elapsed) + } + return result, err +} + func (s *TimerLayerPropertyFieldStore) GetMany(ctx context.Context, groupID string, ids []string) ([]*model.PropertyField, error) { start := time.Now() @@ -14177,10 +14193,10 @@ func (s *TimerLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, return result, err } -func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) + result, err := s.UserAccessTokenStore.Get(tokenID) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14188,15 +14204,15 @@ func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit in if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.Get", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.Get(tokenID) + result, err := s.UserAccessTokenStore.GetAll(offset, limit) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14204,15 +14220,15 @@ func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessT if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.Get", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetAll", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetAll(offset, limit) + result, err := s.UserAccessTokenStore.GetByToken(tokenString) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14220,15 +14236,15 @@ func (s *TimerLayerUserAccessTokenStore) GetAll(offset int, limit int) ([]*model if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetAll", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByToken", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) GetByToken(tokenString string) (*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetByToken(tokenString) + result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14236,15 +14252,15 @@ func (s *TimerLayerUserAccessTokenStore) GetByToken(tokenString string) (*model. if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByToken", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByUser", success, elapsed) } return result, err } -func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perPage int) ([]*model.UserAccessToken, error) { +func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { start := time.Now() - result, err := s.UserAccessTokenStore.GetByUser(userID, page, perPage) + result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -14252,7 +14268,7 @@ func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perP if err == nil { success = "true" } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetByUser", success, elapsed) + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed) } return result, err } @@ -14895,6 +14911,22 @@ func (s *TimerLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook) return result, err } +func (s *TimerLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error { + start := time.Now() + + err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("WebhookStore.UpdateIncomingLastUsed", success, elapsed) + } + return err +} + func (s *TimerLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) { start := time.Now() @@ -14939,10 +14971,6 @@ func (s *TimerLayer) TotalSearchDbConnections() int { return s.Store.TotalSearchDbConnections() } -func (s *TimerLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { - return s.Store.GetDiagnostics(ctx) -} - func (s *TimerLayer) UnlockFromMaster() { s.Store.UnlockFromMaster() } diff --git a/server/enterprise/metrics/metrics.go b/server/enterprise/metrics/metrics.go index d5be5ea08c6..a4e7e1ade34 100644 --- a/server/enterprise/metrics/metrics.go +++ b/server/enterprise/metrics/metrics.go @@ -537,6 +537,7 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf model.ClusterEventInvalidateCacheForTeams, model.ClusterEventInvalidateCacheForContentFlagging, model.ClusterEventInvalidateCacheForSessionAttributes, + model.ClusterEventInvalidateCacheForPropertyFields, model.ClusterEventClearSessionCacheForAllUsers, model.ClusterEventInstallPlugin, model.ClusterEventRemovePlugin, diff --git a/server/i18n/en.json b/server/i18n/en.json index 647d0666555..32d54ef9d7b 100644 --- a/server/i18n/en.json +++ b/server/i18n/en.json @@ -8364,6 +8364,10 @@ "id": "app.property_field.get_by_name.app_error", "translation": "Unable to get property field by name." }, + { + "id": "app.property_field.get_for_group.app_error", + "translation": "Unable to get property fields for group." + }, { "id": "app.property_field.get_many.app_error", "translation": "Unable to get property fields." diff --git a/server/public/model/cluster_message.go b/server/public/model/cluster_message.go index 16439303fce..48253d62be6 100644 --- a/server/public/model/cluster_message.go +++ b/server/public/model/cluster_message.go @@ -44,6 +44,7 @@ const ( ClusterEventInvalidateCacheForTeams ClusterEvent = "inv_teams" ClusterEventInvalidateCacheForContentFlagging ClusterEvent = "inv_content_flagging" ClusterEventInvalidateCacheForSessionAttributes ClusterEvent = "inv_session_attributes" + ClusterEventInvalidateCacheForPropertyFields ClusterEvent = "inv_property_fields" ClusterEventInvalidateCacheForAutoTranslation ClusterEvent = "inv_autotranslation" ClusterEventInvalidateCacheForReadReceipts ClusterEvent = "inv_read_receipts" ClusterEventInvalidateCacheForTemporaryPosts ClusterEvent = "inv_temporary_posts"