Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions server/channels/store/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.

package store

import (
"context"

"github.com/mattermost/mattermost/server/public/shared/request"
)

// storeContextKey is the base type for all context keys for the store.
type storeContextKey string

// contextValue is a type to hold some pre-determined context values.
type contextValue string

// Different possible values of contextValue.
const (
useMaster contextValue = "useMaster"
)

// WithMaster adds the context value that master DB should be selected for this request.
//
// Deprecated: This method is deprecated and there's ongoing change to use `request.CTX` across
// instead of `context.Context`. Please use `RequestContextWithMaster` instead.
func WithMaster(ctx context.Context) context.Context {
return context.WithValue(ctx, storeContextKey(useMaster), true)
}

// RequestContextWithMaster adds the context value that master DB should be selected for this request.
func RequestContextWithMaster(c request.CTX) request.CTX {
ctx := WithMaster(c.Context())
c = c.WithContext(ctx)
return c
}

// HasMaster is a helper function to check whether master DB should be selected or not.
func HasMaster(ctx context.Context) bool {
if v := ctx.Value(storeContextKey(useMaster)); v != nil {
if res, ok := v.(bool); ok && res {
return true
}
}
return false
}
55 changes: 55 additions & 0 deletions server/channels/store/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.

package store

import (
"context"
"testing"

"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/stretchr/testify/assert"
)

func TestContextMaster(t *testing.T) {
ctx := context.Background()

m := WithMaster(ctx)
assert.True(t, HasMaster(m))
}

func TestRequestContextWithMaster(t *testing.T) {
t.Run("set and get", func(t *testing.T) {
var rctx request.CTX = request.TestContext(t)

rctx = RequestContextWithMaster(rctx)
assert.True(t, HasMaster(rctx.Context()))
})

t.Run("values get copied from original context", func(t *testing.T) {
var rctx request.CTX = request.TestContext(t)
rctx = RequestContextWithMaster(rctx)
rctxCopy := rctx

assert.True(t, HasMaster(rctx.Context()))
assert.True(t, HasMaster(rctxCopy.Context()))
})

t.Run("directly assigning does not cause the copy to alter the original context", func(t *testing.T) {
var rctx request.CTX = request.TestContext(t)
rctxCopy := rctx
rctxCopy = RequestContextWithMaster(rctxCopy)

assert.False(t, HasMaster(rctx.Context()))
assert.True(t, HasMaster(rctxCopy.Context()))
})

t.Run("directly assigning does not cause the original context to alter the copy", func(t *testing.T) {
var rctx request.CTX = request.TestContext(t)
rctxCopy := rctx
rctx = RequestContextWithMaster(rctx)

assert.True(t, HasMaster(rctx.Context()))
assert.False(t, HasMaster(rctxCopy.Context()))
})
}
4 changes: 2 additions & 2 deletions server/channels/store/retrylayer/retrylayer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion server/channels/store/searchlayer/channel_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *SearchChannelStore) indexChannel(rctx request.CTX, channel *model.Chann
}
}

teamMemberIDs, err = c.GetTeamMembersForChannel(channel.Id)
teamMemberIDs, err = c.GetTeamMembersForChannel(rctx, channel.Id)
if err != nil {
rctx.Logger().Warn("Encountered error while indexing channel", mlog.String("channel_id", channel.Id), mlog.Err(err))
return
Expand All @@ -66,6 +66,30 @@ func (c *SearchChannelStore) indexChannel(rctx request.CTX, channel *model.Chann
}
}

func (c *SearchChannelStore) bulkIndexChannels(rctx request.CTX, channels []*model.Channel, teamMemberIDs []string) {
// Util function to get userIDs, only for private channels
getUserIDsForPrivateChannel := func(channel *model.Channel) ([]string, error) {
if channel.Type != model.ChannelTypePrivate {
return []string{}, nil
}
return c.GetAllChannelMemberIdsByChannelId(channel.Id)
}

for _, engine := range c.rootStore.searchEngine.GetActiveEngines() {
if !engine.IsIndexingEnabled() {
continue
}

runIndexFn(rctx, engine, func(engineCopy searchengine.SearchEngineInterface) {
appErr := engineCopy.SyncBulkIndexChannels(rctx, channels, getUserIDsForPrivateChannel, teamMemberIDs)
if appErr != nil {
rctx.Logger().Error("Failed to synchronously bulk-index channels.", mlog.String("search_engine", engineCopy.GetName()), mlog.Err(appErr))
return
}
})
}
}

func (c *SearchChannelStore) Save(rctx request.CTX, channel *model.Channel, maxChannels int64, channelOptions ...model.ChannelOption) (*model.Channel, error) {
newChannel, err := c.ChannelStore.Save(rctx, channel, maxChannels, channelOptions...)
if err == nil {
Expand Down
30 changes: 30 additions & 0 deletions server/channels/store/searchlayer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/public/utils"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/platform/services/searchengine"
)
Expand Down Expand Up @@ -111,6 +112,35 @@ func (s *SearchStore) indexUser(rctx request.CTX, user *model.User) {
}
}

func (s *SearchStore) indexChannelsForTeam(rctx request.CTX, teamID string) {
const perPage = 100
var (
channels []*model.Channel
)

channels, err := utils.Pager(func(page int) ([]*model.Channel, error) {
return s.channel.GetPublicChannelsForTeam(teamID, page*perPage, perPage)
}, perPage)
if err != nil {
rctx.Logger().Warn("Encountered error while retrieving public channels for indexing", mlog.String("team_id", teamID), mlog.Err(err))
return
}

if len(channels) == 0 {
return
}

// Use master context to avoid replica lag issues when reading team members
masterRctx := store.RequestContextWithMaster(rctx)
teamMemberIDs, err := s.channel.GetTeamMembersForChannel(masterRctx, channels[0].Id)
if err != nil {
rctx.Logger().Warn("Encountered error while retrieving team members for channel", mlog.String("channel_id", channels[0].Id), mlog.Err(err))
return
}

s.channel.bulkIndexChannels(rctx, channels, teamMemberIDs)
}

// Runs an indexing function synchronously or asynchronously depending on the engine
func runIndexFn(rctx request.CTX, engine searchengine.SearchEngineInterface, indexFn func(searchengine.SearchEngineInterface)) {
if engine.IsIndexingSync() {
Expand Down
26 changes: 23 additions & 3 deletions server/channels/store/searchlayer/team_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ type SearchTeamStore struct {
func (s SearchTeamStore) SaveMember(rctx request.CTX, teamMember *model.TeamMember, maxUsersPerTeam int) (*model.TeamMember, error) {
member, err := s.TeamStore.SaveMember(rctx, teamMember, maxUsersPerTeam)
if err == nil {
s.rootStore.indexUserFromID(rctx, member.UserId)
// Nothing to do if search engine is not active
if s.rootStore.searchEngine.ActiveEngine() != "database" && s.rootStore.searchEngine.ActiveEngine() != "none" {
s.rootStore.indexUserFromID(rctx, member.UserId)
s.rootStore.indexChannelsForTeam(rctx, member.TeamId)
}
}
return member, err
}
Expand All @@ -33,15 +37,31 @@ func (s SearchTeamStore) UpdateMember(rctx request.CTX, teamMember *model.TeamMe
func (s SearchTeamStore) RemoveMember(rctx request.CTX, teamId string, userId string) error {
err := s.TeamStore.RemoveMember(rctx, teamId, userId)
if err == nil {
s.rootStore.indexUserFromID(rctx, userId)
// Nothing to do if search engine is not active
if s.rootStore.searchEngine.ActiveEngine() != "database" && s.rootStore.searchEngine.ActiveEngine() != "none" {
s.rootStore.indexUserFromID(rctx, userId)
s.rootStore.indexChannelsForTeam(rctx, teamId)
}
}
return err
}

func (s SearchTeamStore) RemoveAllMembersByUser(rctx request.CTX, userId string) error {
if s.rootStore.searchEngine.ActiveEngine() != "database" && s.rootStore.searchEngine.ActiveEngine() != "none" {
memberships, err := s.TeamStore.GetTeamsForUser(rctx, userId, "", true)
if err != nil {
return err
}
for _, membership := range memberships {
s.rootStore.indexChannelsForTeam(rctx, membership.TeamId)
}
}

err := s.TeamStore.RemoveAllMembersByUser(rctx, userId)
if err == nil {
s.rootStore.indexUserFromID(rctx, userId)
if s.rootStore.searchEngine.ActiveEngine() != "database" && s.rootStore.searchEngine.ActiveEngine() != "none" {
s.rootStore.indexUserFromID(rctx, userId)
}
}
return err
}
4 changes: 2 additions & 2 deletions server/channels/store/sqlstore/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3046,9 +3046,9 @@ func (s SqlChannelStore) GetMembersForUserWithCursorPagination(userId string, pe
return dbMembers.ToModel(), nil
}

func (s SqlChannelStore) GetTeamMembersForChannel(channelID string) ([]string, error) {
func (s SqlChannelStore) GetTeamMembersForChannel(rctx request.CTX, channelID string) ([]string, error) {
teamMemberIDs := []string{}
if err := s.GetReplica().Select(&teamMemberIDs, `SELECT tm.UserId
if err := s.DBXFromContext(rctx.Context()).Select(&teamMemberIDs, `SELECT tm.UserId
FROM Channels c, Teams t, TeamMembers tm
WHERE
c.TeamId=t.Id
Expand Down
25 changes: 4 additions & 21 deletions server/channels/store/sqlstore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,25 @@ import (
"context"

"github.com/mattermost/mattermost/server/public/shared/request"
)

// storeContextKey is the base type for all context keys for the store.
type storeContextKey string

// contextValue is a type to hold some pre-determined context values.
type contextValue string

// Different possible values of contextValue.
const (
useMaster contextValue = "useMaster"
"github.com/mattermost/mattermost/server/v8/channels/store"
)

// WithMaster adds the context value that master DB should be selected for this request.
//
// Deprecated: This method is deprecated and there's ongoing change to use `request.CTX` across
// instead of `context.Context`. Please use `RequestContextWithMaster` instead.
func WithMaster(ctx context.Context) context.Context {
return context.WithValue(ctx, storeContextKey(useMaster), true)
return store.WithMaster(ctx)
}

// RequestContextWithMaster adds the context value that master DB should be selected for this request.
func RequestContextWithMaster(c request.CTX) request.CTX {
ctx := WithMaster(c.Context())
c = c.WithContext(ctx)
return c
return store.RequestContextWithMaster(c)
}

// HasMaster is a helper function to check whether master DB should be selected or not.
func HasMaster(ctx context.Context) bool {
if v := ctx.Value(storeContextKey(useMaster)); v != nil {
if res, ok := v.(bool); ok && res {
return true
}
}
return false
return store.HasMaster(ctx)
}

// DBXFromContext is a helper utility that returns the sqlx DB handle from a given context.
Expand Down
2 changes: 1 addition & 1 deletion server/channels/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ type ChannelStore interface {
AnalyticsDeletedTypeCount(teamID string, channelType model.ChannelType) (int64, error)
AnalyticsCountAll(teamID string) (map[model.ChannelType]int64, error)
GetMembersForUser(teamID string, userID string) (model.ChannelMembers, error)
GetTeamMembersForChannel(channelID string) ([]string, error)
GetTeamMembersForChannel(rctx request.CTX, channelID string) ([]string, error)
GetMembersForUserWithPagination(userID string, page, perPage int) (model.ChannelMembersWithTeamData, error)
GetMembersForUserWithCursorPagination(userId string, perPage int, fromChanneID string) (model.ChannelMembersWithTeamData, error)
Autocomplete(rctx request.CTX, userID, term string, includeDeleted, isGuest bool) (model.ChannelListWithTeamData, error)
Expand Down
18 changes: 9 additions & 9 deletions server/channels/store/storetest/mocks/ChannelStore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions server/channels/store/timerlayer/timerlayer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions server/enterprise/elasticsearch/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,17 @@ const (
// At the moment, this number is hardcoded. If needed, we can expose
// this to the config.
BulkFlushInterval = 5 * time.Second

// Size of the largest request to be done, in bytes
BulkFlushBytes = 10 * 1024 * 1024 // 10 MiB
)

type BulkSettings struct {
FlushBytes int
FlushInterval time.Duration
FlushNumReqs int
}

var (
urlRe = regexp.MustCompile(URLRegexpRE)
markdownLinkRe = regexp.MustCompile(URLMarkdownLinkRE)
Expand Down
3 changes: 2 additions & 1 deletion server/enterprise/elasticsearch/common/indexing_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ func BulkIndexChannels(config *model.Config,
}
}

teamMemberIDs, err := store.Channel().GetTeamMembersForChannel(channel.Id)
rctx := request.EmptyContext(logger)
teamMemberIDs, err := store.Channel().GetTeamMembersForChannel(rctx, channel.Id)
if err != nil {
return nil, model.NewAppError("IndexerWorker.BulkIndexChannels", "ent.elasticsearch.getAllTeamMembers.error", nil, "", http.StatusInternalServerError).Wrap(err)
}
Expand Down
Loading
Loading