Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions server/channels/api4/shared_channel_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,17 +683,28 @@ func TestSharedChannelPostMetadataSync(t *testing.T) {
// Trigger another sync to ensure no duplicates are created
service.NotifyChannelChanged(testChannel.Id)

// Verify acknowledgement count remains 1 (no duplicates)
// Echo prevention may suppress resending unchanged acknowledgements, so wait
// for sync work to finish instead of requiring Cluster A to receive traffic.
require.Eventually(t, func() bool {
muA.Lock()
defer muA.Unlock()
for _, post := range syncedPostsServerA {
if post.Id == postIdToTrack && post.Metadata != nil && post.Metadata.Acknowledgements != nil {
return len(post.Metadata.Acknowledgements) == 1
}
return !service.HasPendingTasksForTesting()
}, 10*time.Second, 100*time.Millisecond, "Sync tasks should complete after resync trigger")

finalAcks, appErr := th.App.GetAcknowledgementsForPost(postIdToTrack)
require.Nil(t, appErr)
require.Len(t, finalAcks, 1, "Should maintain single acknowledgement after resync")

muA.Lock()
var serverAResyncPost *model.Post
for _, post := range syncedPostsServerA {
if post.Id == postIdToTrack && post.Metadata != nil && post.Metadata.Acknowledgements != nil {
serverAResyncPost = post
break
}
return len(syncedPostsServerA) > 0
}, 3*time.Second, 100*time.Millisecond, "Should maintain single acknowledgement after resync")
}
muA.Unlock()
if serverAResyncPost != nil {
require.Len(t, serverAResyncPost.Metadata.Acknowledgements, 1, "Sync payload should not contain duplicate acknowledgements")
}

t.Logf("✅ Cross-cluster acknowledgement flow completed successfully:")
t.Logf(" 1. Server A created post with ack request: %s", postIdToTrack)
Expand Down
9 changes: 9 additions & 0 deletions server/channels/app/properties/property_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,15 @@ func (ps *PropertyService) GetPropertyFields(rctx request.CTX, groupID string, i
return ps.runPostGetPropertyFields(rctx, fields)
}

func (ps *PropertyService) GetPropertyFieldsForGroup(rctx request.CTX, groupID string) ([]*model.PropertyField, error) {
fields, err := ps.fieldStore.GetForGroup(context.Background(), groupID)
if err != nil {
return nil, fmt.Errorf("GetPropertyFieldsForGroup: %w", err)
}

return ps.runPostGetPropertyFields(rctx, fields)
}

func (ps *PropertyService) GetPropertyFieldByName(rctx request.CTX, groupID, targetID, name string) (*model.PropertyField, error) {
field, err := ps.getPropertyFieldByName(groupID, targetID, name)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions server/channels/app/property_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ func (a *App) SearchPropertyFields(rctx request.CTX, groupID string, opts model.
return fields, nil
}

// GetPropertyFieldsForGroup retrieves all active property fields for a group.
func (a *App) GetPropertyFieldsForGroup(rctx request.CTX, groupID string) ([]*model.PropertyField, *model.AppError) {
fields, err := a.Srv().propertyService.GetPropertyFieldsForGroup(rctx, groupID)
if err != nil {
if appErr := mapPropertyServiceError("GetPropertyFieldsForGroup", err); appErr != nil {
return nil, appErr
}
return nil, model.NewAppError("GetPropertyFieldsForGroup", "app.property_field.get_for_group.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
return fields, nil
}

// CountPropertyFieldsForGroup counts property fields for a group.
func (a *App) CountPropertyFieldsForGroup(rctx request.CTX, groupID string, includeDeleted bool) (int64, *model.AppError) {
var count int64
Expand Down
76 changes: 76 additions & 0 deletions server/channels/app/property_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,82 @@ func TestDeletePropertyField(t *testing.T) {
})
}

func TestPropertyFieldCacheInvalidation(t *testing.T) {
mainHelper.Parallel(t)
th := Setup(t).InitBasic(t)

newField := func(groupID, name string) *model.PropertyField {
return &model.PropertyField{
GroupID: groupID,
Name: name,
Type: model.PropertyFieldTypeText,
ObjectType: model.PropertyFieldObjectTypeUser,
TargetType: string(model.PropertyFieldTargetLevelSystem),
}
}

t.Run("create invalidates the cached group listing", func(t *testing.T) {
groupID := registerTestPropertyGroup(t, th)

_, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "First Field"), false, "")
require.Nil(t, appErr)

// Populate the cache for this group.
fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID)
require.Nil(t, appErr)
require.Len(t, fields, 1)

_, appErr = th.App.CreatePropertyField(th.Context, newField(groupID, "Second Field"), false, "")
require.Nil(t, appErr)

// A stale cache would still return a single field here.
fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID)
require.Nil(t, appErr)
assert.Len(t, fields, 2)
})

t.Run("update invalidates the cached group listing", func(t *testing.T) {
groupID := registerTestPropertyGroup(t, th)

created, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "Original Name"), false, "")
require.Nil(t, appErr)

fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID)
require.Nil(t, appErr)
require.Len(t, fields, 1)
require.Equal(t, "Original Name", fields[0].Name)

created.Name = "Updated Name"
_, _, appErr = th.App.UpdatePropertyField(th.Context, groupID, created, false, "")
require.Nil(t, appErr)

// A stale cache would still return the original name here.
fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID)
require.Nil(t, appErr)
require.Len(t, fields, 1)
assert.Equal(t, "Updated Name", fields[0].Name)
})

t.Run("delete invalidates the cached group listing", func(t *testing.T) {
groupID := registerTestPropertyGroup(t, th)

created, appErr := th.App.CreatePropertyField(th.Context, newField(groupID, "Field to Delete"), false, "")
require.Nil(t, appErr)

fields, appErr := th.App.GetPropertyFieldsForGroup(th.Context, groupID)
require.Nil(t, appErr)
require.Len(t, fields, 1)

appErr = th.App.DeletePropertyField(th.Context, groupID, created.ID, false, "")
require.Nil(t, appErr)

// A stale cache would still return the deleted field here.
fields, appErr = th.App.GetPropertyFieldsForGroup(th.Context, groupID)
require.Nil(t, appErr)
assert.Empty(t, fields)
})
}

func TestPropertyFieldBroadcastParams(t *testing.T) {
rctx := request.TestContext(t)

Expand Down
23 changes: 23 additions & 0 deletions server/channels/store/localcachelayer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ const (

SessionAttributeCacheSize = model.SessionCacheSize
SessionAttributeCacheSec = 30

PropertyFieldCacheSize = 100
PropertyFieldCacheSec = 30 * 60
)

var clearCacheMessageData = []byte("")
Expand Down Expand Up @@ -158,6 +161,9 @@ type LocalCacheStore struct {

sessionAttribute LocalCacheSessionAttributeStore
sessionAttributeCache cache.Cache

propertyField LocalCachePropertyFieldStore
propertyFieldCache cache.Cache
}

func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterface, cluster einterfaces.ClusterInterface, cacheProvider cache.Provider, logger mlog.LoggerIFace) (localCacheStore LocalCacheStore, err error) {
Expand Down Expand Up @@ -473,6 +479,17 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf
}
localCacheStore.sessionAttribute = LocalCacheSessionAttributeStore{SessionAttributeStore: baseStore.SessionAttribute(), rootStore: &localCacheStore}

// Property Fields
if localCacheStore.propertyFieldCache, err = cacheProvider.NewCache(&cache.CacheOptions{
Size: PropertyFieldCacheSize,
Name: "PropertyField",
DefaultExpiry: PropertyFieldCacheSec * time.Second,
InvalidateClusterEvent: model.ClusterEventInvalidateCacheForPropertyFields,
}); err != nil {
return
}
localCacheStore.propertyField = LocalCachePropertyFieldStore{PropertyFieldStore: baseStore.PropertyField(), rootStore: &localCacheStore}

if cluster != nil {
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForReactions, localCacheStore.reaction.handleClusterInvalidateReaction)
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForRoles, localCacheStore.role.handleClusterInvalidateRole)
Expand Down Expand Up @@ -503,6 +520,7 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForReadReceipts, localCacheStore.readReceipt.handleClusterInvalidateReadReceipts)
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForTemporaryPosts, localCacheStore.temporaryPost.handleClusterInvalidateTemporaryPosts)
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForSessionAttributes, localCacheStore.sessionAttribute.handleClusterInvalidateSessionAttributes)
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForPropertyFields, localCacheStore.propertyField.handleClusterInvalidatePropertyField)
}
return
}
Expand Down Expand Up @@ -571,6 +589,10 @@ func (s LocalCacheStore) SessionAttribute() store.SessionAttributeStore {
return &s.sessionAttribute
}

func (s LocalCacheStore) PropertyField() store.PropertyFieldStore {
return s.propertyField
}

func (s LocalCacheStore) DropAllTables() {
s.Invalidate()
s.Store.DropAllTables()
Expand Down Expand Up @@ -710,6 +732,7 @@ func (s *LocalCacheStore) Invalidate() {
s.doClearCacheCluster(s.readReceiptPostReadersCache)
s.doClearCacheCluster(s.temporaryPostCache)
s.doClearCacheCluster(s.sessionAttributeCache)
s.doClearCacheCluster(s.propertyFieldCache)
}

// allocateCacheTargets is used to fill target value types
Expand Down
8 changes: 8 additions & 0 deletions server/channels/store/localcachelayer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ func getMockStore(t *testing.T) *mocks.Store {
mockSessionAttributeStore := mocks.SessionAttributeStore{}
mockStore.On("SessionAttribute").Return(&mockSessionAttributeStore)

fakeField := model.PropertyField{ID: "field-id", GroupID: "group-id", Name: "field-name"}
mockPropertyFieldStore := mocks.PropertyFieldStore{}
mockPropertyFieldStore.On("GetForGroup", context.Background(), "group-id").Return([]*model.PropertyField{&fakeField}, nil)
mockPropertyFieldStore.On("Create", &fakeField).Return(&fakeField, nil)
mockPropertyFieldStore.On("Update", "group-id", []*model.PropertyField{&fakeField}, map[string]int64(nil)).Return([]*model.PropertyField{&fakeField}, nil)
mockPropertyFieldStore.On("Delete", "group-id", "field-id").Return(nil)
mockStore.On("PropertyField").Return(&mockPropertyFieldStore)

mockReadReceiptStore := &mocks.ReadReceiptStore{}
mockStore.On("ReadReceipt").Return(mockReadReceiptStore)

Expand Down
96 changes: 96 additions & 0 deletions server/channels/store/localcachelayer/property_field_layer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.

package localcachelayer

import (
"bytes"
"context"

"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/v8/channels/store"
)

func (s LocalCachePropertyFieldStore) Create(field *model.PropertyField) (*model.PropertyField, error) {
created, err := s.PropertyFieldStore.Create(field)
if err != nil {
return nil, err
}

s.InvalidateFieldsForGroup(created.GroupID)
return created, nil
}

func (s LocalCachePropertyFieldStore) Update(groupID string, fields []*model.PropertyField, expectedUpdateAts map[string]int64) ([]*model.PropertyField, error) {
updated, err := s.PropertyFieldStore.Update(groupID, fields, expectedUpdateAts)
if err != nil {
return nil, err
}

// The returned slice includes both the requested fields and any linked
// fields the store propagated to. Invalidate each distinct group so all
// affected GetForGroup caches are cleared.
invalidated := make(map[string]bool, len(updated))
for _, field := range updated {
if invalidated[field.GroupID] {
continue
}
invalidated[field.GroupID] = true
s.InvalidateFieldsForGroup(field.GroupID)
}
return updated, nil
}

func (s LocalCachePropertyFieldStore) Delete(groupID string, id string) error {
if err := s.PropertyFieldStore.Delete(groupID, id); err != nil {
return err
}

s.InvalidateFieldsForGroup(groupID)
return nil
}

type LocalCachePropertyFieldStore struct {
store.PropertyFieldStore
rootStore *LocalCacheStore
}

func (s *LocalCachePropertyFieldStore) handleClusterInvalidatePropertyField(msg *model.ClusterMessage) {
if bytes.Equal(msg.Data, clearCacheMessageData) {
if err := s.rootStore.propertyFieldCache.Purge(); err != nil {
s.rootStore.logger.Warn("failed to purge property field cache", mlog.Err(err))
}
} else if err := s.rootStore.propertyFieldCache.Remove(string(msg.Data)); err != nil {
s.rootStore.logger.Warn("failed to remove property field cache entry", mlog.Err(err))
}
}

func (s LocalCachePropertyFieldStore) InvalidateFieldsForGroup(groupID string) {
s.rootStore.doInvalidateCacheCluster(s.rootStore.propertyFieldCache, groupID, nil)
if s.rootStore.metrics != nil {
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.propertyFieldCache.Name())
}
}

func (s *LocalCachePropertyFieldStore) getFieldsForGroupFromCache(groupID string) ([]*model.PropertyField, bool) {
var fields []*model.PropertyField
if err := s.rootStore.doStandardReadCache(s.rootStore.propertyFieldCache, groupID, &fields); err == nil {
return fields, true
}
return nil, false
}

func (s LocalCachePropertyFieldStore) GetForGroup(ctx context.Context, groupID string) ([]*model.PropertyField, error) {
if fields, ok := s.getFieldsForGroupFromCache(groupID); ok {
return fields, nil
}

fields, err := s.PropertyFieldStore.GetForGroup(ctx, groupID)
if err != nil {
return nil, err
}

s.rootStore.doStandardAddToCache(s.rootStore.propertyFieldCache, groupID, fields)
return fields, nil
}
Loading
Loading