diff --git a/pkg/tnservice/store.go b/pkg/tnservice/store.go index 9fcf2896f1106..4c17f5b46cb76 100644 --- a/pkg/tnservice/store.go +++ b/pkg/tnservice/store.go @@ -17,6 +17,7 @@ package tnservice import ( "context" "errors" + "runtime/debug" "sync" "time" @@ -27,10 +28,12 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/common/stopper" + "github.com/matrixorigin/matrixone/pkg/common/system" "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/lockservice" "github.com/matrixorigin/matrixone/pkg/logservice" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" logservicepb "github.com/matrixorigin/matrixone/pkg/pb/logservice" "github.com/matrixorigin/matrixone/pkg/pb/metadata" @@ -479,6 +482,12 @@ func (s *store) initQueryService(inStandalone bool) { func (s *store) initQueryCommandHandler() { s.queryService.AddHandleFunc(query.CmdMethod_GetCacheInfo, s.handleGetCacheInfo, false) s.queryService.AddHandleFunc(query.CmdMethod_GetLatestBind, s.handleGetLatestBind, false) + s.queryService.AddHandleFunc(query.CmdMethod_GOMAXPROCS, s.handleGoMaxProcs, false) + s.queryService.AddHandleFunc(query.CmdMethod_GOMEMLIMIT, s.handleGoMemLimit, false) + s.queryService.AddHandleFunc(query.CmdMethod_GOGCPercent, s.handleGoGCPercent, false) + s.queryService.AddHandleFunc(query.CmdMethod_FileServiceCache, s.handleFileServiceCacheRequest, false) + s.queryService.AddHandleFunc(query.CmdMethod_FileServiceCacheEvict, s.handleFileServiceCacheEvictRequest, false) + s.queryService.AddHandleFunc(query.CmdMethod_MetadataCache, s.handleMetadataCacheRequest, false) } func (s *store) handleGetCacheInfo(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error { @@ -515,3 +524,99 @@ func (s *store) setupStatusServer() { func (s *store) GetLockTableAllocator() lockservice.LockTableAllocator { return s.lockTableAllocator } + +func (s *store) handleGoMaxProcs( + ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer, +) error { + resp.GoMaxProcsResponse.MaxProcs = int32(system.SetGoMaxProcs(int(req.GoMaxProcsRequest.MaxProcs))) + s.rt.Logger().Info("QueryService::GoMaxProcs", + zap.String("op", "set"), + zap.Int32("in", req.GoMaxProcsRequest.MaxProcs), + zap.Int32("out", resp.GoMaxProcsResponse.MaxProcs), + ) + return nil +} + +func (s *store) handleGoMemLimit( + ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer, +) error { + resp.GoMemLimitResponse.MemLimitBytes = int64(debug.SetMemoryLimit(req.GoMemLimitRequest.MemLimitBytes)) + s.rt.Logger().Info("QueryService::GoMemLimit", + zap.String("op", "set"), + zap.Int64("in", req.GoMemLimitRequest.MemLimitBytes), + zap.Int64("out", resp.GoMemLimitResponse.MemLimitBytes), + ) + return nil +} + +func (s *store) handleGoGCPercent( + ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer, +) error { + resp.GoGCPercentResponse.Percent = int32(debug.SetGCPercent(int(req.GoGCPercentRequest.Percent))) + s.rt.Logger().Info("QueryService::GOGCPercent", + zap.Int32("in", req.GoGCPercentRequest.Percent), + zap.Int32("out", resp.GoGCPercentResponse.Percent), + ) + return nil +} + +func (s *store) handleFileServiceCacheRequest( + ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer, +) error { + + if n := req.FileServiceCacheRequest.CacheSize; n > 0 { + switch req.FileServiceCacheRequest.Type { + case query.FileServiceCacheType_Disk: + fileservice.GlobalDiskCacheSizeHint.Store(n) + case query.FileServiceCacheType_Memory: + fileservice.GlobalMemoryCacheSizeHint.Store(n) + } + s.rt.Logger().Info("cache size adjusted", + zap.Any("type", req.FileServiceCacheRequest.Type), + zap.Any("size", n), + ) + } + return nil +} + +func (s *store) handleFileServiceCacheEvictRequest( + ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer, +) error { + + s.rt.Logger().Info("file service cache evict", + zap.String("type", req.FileServiceCacheEvictRequest.Type.String())) + + var ret map[string]int64 + switch req.FileServiceCacheEvictRequest.Type { + case query.FileServiceCacheType_Disk: + ret = fileservice.EvictDiskCaches(ctx) + case query.FileServiceCacheType_Memory: + ret = fileservice.EvictMemoryCaches(ctx) + } + + for name, target := range ret { + s.rt.Logger().Info("file service cache evict", + zap.String("type", req.FileServiceCacheEvictRequest.Type.String()), + zap.Int64("size", target), + zap.String("name", name), + ) + resp.FileServiceCacheEvictResponse.CacheSize = target + resp.FileServiceCacheEvictResponse.CacheCapacity = target + // usually one instance + break + } + return nil +} + +func (s *store) handleMetadataCacheRequest( + ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer, +) error { + s.rt.Logger().Info("metadata cache", zap.Int64("size", req.MetadataCacheRequest.CacheSize)) + // set capacity hint + objectio.GlobalCacheCapacityHint.Store(req.MetadataCacheRequest.CacheSize) + // evict + target := objectio.EvictCache(ctx) + // response + resp.MetadataCacheResponse.CacheCapacity = target + return nil +}