Skip to content

Commit 7cacfa1

Browse files
committed
chatv/2: expose server for use as a notifier
1 parent 9256fdc commit 7cacfa1

File tree

2 files changed

+30
-30
lines changed

2 files changed

+30
-30
lines changed

pkg/code/server/grpc/chat/v2/server.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const (
4545
flushMessageCount = 100
4646
)
4747

48-
type server struct {
48+
type Server struct {
4949
log *logrus.Entry
5050

5151
data code_data.Provider
@@ -60,9 +60,9 @@ type server struct {
6060
chatpb.UnimplementedChatServer
6161
}
6262

63-
func NewChatServer(data code_data.Provider, auth *auth_util.RPCSignatureVerifier) chatpb.ChatServer {
64-
s := &server{
65-
log: logrus.StandardLogger().WithField("type", "chat/v2/server"),
63+
func NewChatServer(data code_data.Provider, auth *auth_util.RPCSignatureVerifier) *Server {
64+
s := &Server{
65+
log: logrus.StandardLogger().WithField("type", "chat/v2/Server"),
6666

6767
data: data,
6868
auth: auth,
@@ -81,7 +81,7 @@ func NewChatServer(data code_data.Provider, auth *auth_util.RPCSignatureVerifier
8181
}
8282

8383
// todo: This will require a lot of optimizations since we iterate and make several DB calls for each chat membership
84-
func (s *server) GetChats(ctx context.Context, req *chatpb.GetChatsRequest) (*chatpb.GetChatsResponse, error) {
84+
func (s *Server) GetChats(ctx context.Context, req *chatpb.GetChatsRequest) (*chatpb.GetChatsResponse, error) {
8585
log := s.log.WithField("method", "GetChats")
8686
log = client.InjectLoggingMetadata(ctx, log)
8787

@@ -181,7 +181,7 @@ func (s *server) GetChats(ctx context.Context, req *chatpb.GetChatsRequest) (*ch
181181
}, nil
182182
}
183183

184-
func (s *server) GetMessages(ctx context.Context, req *chatpb.GetMessagesRequest) (*chatpb.GetMessagesResponse, error) {
184+
func (s *Server) GetMessages(ctx context.Context, req *chatpb.GetMessagesRequest) (*chatpb.GetMessagesResponse, error) {
185185
log := s.log.WithField("method", "GetMessages")
186186
log = client.InjectLoggingMetadata(ctx, log)
187187

@@ -284,7 +284,7 @@ func (s *server) GetMessages(ctx context.Context, req *chatpb.GetMessagesRequest
284284
}, nil
285285
}
286286

287-
func (s *server) StreamChatEvents(streamer chatpb.Chat_StreamChatEventsServer) error {
287+
func (s *Server) StreamChatEvents(streamer chatpb.Chat_StreamChatEventsServer) error {
288288
ctx := streamer.Context()
289289

290290
log := s.log.WithField("method", "StreamChatEvents")
@@ -359,9 +359,9 @@ func (s *server) StreamChatEvents(streamer chatpb.Chat_StreamChatEventsServer) e
359359
stream, exists := s.streams[streamKey]
360360
if exists {
361361
s.streamsMu.Unlock()
362-
// There's an existing stream on this server that must be terminated first.
362+
// There's an existing stream on this Server that must be terminated first.
363363
// Warn to see how often this happens in practice
364-
log.Warnf("existing stream detected on this server (stream=%p) ; aborting", stream)
364+
log.Warnf("existing stream detected on this Server (stream=%p) ; aborting", stream)
365365
return status.Error(codes.Aborted, "stream already exists")
366366
}
367367

@@ -442,7 +442,7 @@ func (s *server) StreamChatEvents(streamer chatpb.Chat_StreamChatEventsServer) e
442442
}
443443
}
444444

445-
func (s *server) flushMessages(ctx context.Context, chatId chat.ChatId, owner *common.Account, stream *chatEventStream) {
445+
func (s *Server) flushMessages(ctx context.Context, chatId chat.ChatId, owner *common.Account, stream *chatEventStream) {
446446
log := s.log.WithFields(logrus.Fields{
447447
"method": "flushMessages",
448448
"chat_id": chatId.String(),
@@ -477,7 +477,7 @@ func (s *server) flushMessages(ctx context.Context, chatId chat.ChatId, owner *c
477477
}
478478
}
479479

480-
func (s *server) flushPointers(ctx context.Context, chatId chat.ChatId, stream *chatEventStream) {
480+
func (s *Server) flushPointers(ctx context.Context, chatId chat.ChatId, stream *chatEventStream) {
481481
log := s.log.WithFields(logrus.Fields{
482482
"method": "flushPointers",
483483
"chat_id": chatId.String(),
@@ -520,7 +520,7 @@ func (s *server) flushPointers(ctx context.Context, chatId chat.ChatId, stream *
520520
}
521521
}
522522

523-
func (s *server) StartChat(ctx context.Context, req *chatpb.StartChatRequest) (*chatpb.StartChatResponse, error) {
523+
func (s *Server) StartChat(ctx context.Context, req *chatpb.StartChatRequest) (*chatpb.StartChatResponse, error) {
524524
log := s.log.WithField("method", "StartChat")
525525
log = client.InjectLoggingMetadata(ctx, log)
526526

@@ -695,7 +695,7 @@ func (s *server) StartChat(ctx context.Context, req *chatpb.StartChatRequest) (*
695695
}
696696
}
697697

698-
func (s *server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest) (*chatpb.SendMessageResponse, error) {
698+
func (s *Server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest) (*chatpb.SendMessageResponse, error) {
699699
log := s.log.WithField("method", "SendMessage")
700700
log = client.InjectLoggingMetadata(ctx, log)
701701

@@ -784,8 +784,8 @@ func (s *server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest
784784
}, nil
785785
}
786786

787-
// TODO(api): This likely needs an RPC that can be called from any other server.
788-
func (s *server) NotifyNewMessage(ctx context.Context, chatID chat.ChatId, message *chatpb.ChatMessage) error {
787+
// TODO(api): This likely needs an RPC that can be called from any other Server.
788+
func (s *Server) NotifyNewMessage(ctx context.Context, chatID chat.ChatId, message *chatpb.ChatMessage) error {
789789
members, err := s.data.GetAllChatMembersV2(ctx, chatID)
790790
if errors.Is(err, chat.ErrMemberNotFound) {
791791
return nil
@@ -827,7 +827,7 @@ func (s *server) NotifyNewMessage(ctx context.Context, chatID chat.ChatId, messa
827827
}
828828

829829
// todo: This belongs in the common chat utility, which currently only operates on v1 chats
830-
func (s *server) persistChatMessage(ctx context.Context, chatId chat.ChatId, protoChatMessage *chatpb.ChatMessage) error {
830+
func (s *Server) persistChatMessage(ctx context.Context, chatId chat.ChatId, protoChatMessage *chatpb.ChatMessage) error {
831831
if err := protoChatMessage.Validate(); err != nil {
832832
return errors.Wrap(err, "proto chat message failed validation")
833833
}
@@ -877,7 +877,7 @@ func (s *server) persistChatMessage(ctx context.Context, chatId chat.ChatId, pro
877877
return nil
878878
}
879879

880-
func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerRequest) (*chatpb.AdvancePointerResponse, error) {
880+
func (s *Server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerRequest) (*chatpb.AdvancePointerResponse, error) {
881881
log := s.log.WithField("method", "AdvancePointer")
882882
log = client.InjectLoggingMetadata(ctx, log)
883883

@@ -981,7 +981,7 @@ func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerR
981981
}, nil
982982
}
983983

984-
func (s *server) RevealIdentity(ctx context.Context, req *chatpb.RevealIdentityRequest) (*chatpb.RevealIdentityResponse, error) {
984+
func (s *Server) RevealIdentity(ctx context.Context, req *chatpb.RevealIdentityRequest) (*chatpb.RevealIdentityResponse, error) {
985985
log := s.log.WithField("method", "RevealIdentity")
986986
log = client.InjectLoggingMetadata(ctx, log)
987987

@@ -1135,7 +1135,7 @@ func (s *server) RevealIdentity(ctx context.Context, req *chatpb.RevealIdentityR
11351135
}
11361136
}
11371137

1138-
func (s *server) SetMuteState(ctx context.Context, req *chatpb.SetMuteStateRequest) (*chatpb.SetMuteStateResponse, error) {
1138+
func (s *Server) SetMuteState(ctx context.Context, req *chatpb.SetMuteStateRequest) (*chatpb.SetMuteStateResponse, error) {
11391139
log := s.log.WithField("method", "SetMuteState")
11401140
log = client.InjectLoggingMetadata(ctx, log)
11411141

@@ -1200,7 +1200,7 @@ func (s *server) SetMuteState(ctx context.Context, req *chatpb.SetMuteStateReque
12001200
}, nil
12011201
}
12021202

1203-
func (s *server) SetSubscriptionState(ctx context.Context, req *chatpb.SetSubscriptionStateRequest) (*chatpb.SetSubscriptionStateResponse, error) {
1203+
func (s *Server) SetSubscriptionState(ctx context.Context, req *chatpb.SetSubscriptionStateRequest) (*chatpb.SetSubscriptionStateResponse, error) {
12041204
log := s.log.WithField("method", "SetSubscriptionState")
12051205
log = client.InjectLoggingMetadata(ctx, log)
12061206

@@ -1265,7 +1265,7 @@ func (s *server) SetSubscriptionState(ctx context.Context, req *chatpb.SetSubscr
12651265
}, nil
12661266
}
12671267

1268-
func (s *server) toProtoChat(ctx context.Context, chatRecord *chat.ChatRecord, memberRecords []*chat.MemberRecord, myIdentitiesByPlatform map[chat.Platform]string) (*chatpb.ChatMetadata, error) {
1268+
func (s *Server) toProtoChat(ctx context.Context, chatRecord *chat.ChatRecord, memberRecords []*chat.MemberRecord, myIdentitiesByPlatform map[chat.Platform]string) (*chatpb.ChatMetadata, error) {
12691269
protoChat := &chatpb.ChatMetadata{
12701270
ChatId: chatRecord.ChatId.ToProto(),
12711271
Type: chatRecord.ChatType.ToProto(),
@@ -1349,7 +1349,7 @@ func (s *server) toProtoChat(ctx context.Context, chatRecord *chat.ChatRecord, m
13491349
return protoChat, nil
13501350
}
13511351

1352-
func (s *server) getProtoChatMessages(ctx context.Context, chatId chat.ChatId, owner *common.Account, queryOptions ...query.Option) ([]*chatpb.ChatMessage, error) {
1352+
func (s *Server) getProtoChatMessages(ctx context.Context, chatId chat.ChatId, owner *common.Account, queryOptions ...query.Option) ([]*chatpb.ChatMessage, error) {
13531353
messageRecords, err := s.data.GetAllChatMessagesV2(
13541354
ctx,
13551355
chatId,
@@ -1405,7 +1405,7 @@ func (s *server) getProtoChatMessages(ctx context.Context, chatId chat.ChatId, o
14051405
return res, nil
14061406
}
14071407

1408-
func (s *server) onPersistChatMessage(log *logrus.Entry, chatId chat.ChatId, chatMessage *chatpb.ChatMessage) {
1408+
func (s *Server) onPersistChatMessage(log *logrus.Entry, chatId chat.ChatId, chatMessage *chatpb.ChatMessage) {
14091409
event := &chatpb.ChatStreamEvent{
14101410
Type: &chatpb.ChatStreamEvent_Message{
14111411
Message: chatMessage,
@@ -1418,7 +1418,7 @@ func (s *server) onPersistChatMessage(log *logrus.Entry, chatId chat.ChatId, cha
14181418
// todo: send the push
14191419
}
14201420

1421-
func (s *server) getAllIdentities(ctx context.Context, owner *common.Account) (map[chat.Platform]string, error) {
1421+
func (s *Server) getAllIdentities(ctx context.Context, owner *common.Account) (map[chat.Platform]string, error) {
14221422
identities := map[chat.Platform]string{
14231423
chat.PlatformCode: owner.PublicKey().ToBase58(),
14241424
}
@@ -1434,7 +1434,7 @@ func (s *server) getAllIdentities(ctx context.Context, owner *common.Account) (m
14341434
return identities, nil
14351435
}
14361436

1437-
func (s *server) ownsChatMemberWithoutRecord(ctx context.Context, chatId chat.ChatId, memberId chat.MemberId, owner *common.Account) (bool, error) {
1437+
func (s *Server) ownsChatMemberWithoutRecord(ctx context.Context, chatId chat.ChatId, memberId chat.MemberId, owner *common.Account) (bool, error) {
14381438
memberRecord, err := s.data.GetChatMemberByIdV2(ctx, chatId, memberId)
14391439
switch err {
14401440
case nil:
@@ -1447,7 +1447,7 @@ func (s *server) ownsChatMemberWithoutRecord(ctx context.Context, chatId chat.Ch
14471447
return s.ownsChatMemberWithRecord(ctx, chatId, memberRecord, owner)
14481448
}
14491449

1450-
func (s *server) ownsChatMemberWithRecord(ctx context.Context, chatId chat.ChatId, memberRecord *chat.MemberRecord, owner *common.Account) (bool, error) {
1450+
func (s *Server) ownsChatMemberWithRecord(ctx context.Context, chatId chat.ChatId, memberRecord *chat.MemberRecord, owner *common.Account) (bool, error) {
14511451
switch memberRecord.Platform {
14521452
case chat.PlatformCode:
14531453
return memberRecord.PlatformId == owner.PublicKey().ToBase58(), nil
@@ -1459,7 +1459,7 @@ func (s *server) ownsChatMemberWithRecord(ctx context.Context, chatId chat.ChatI
14591459
}
14601460

14611461
// todo: This logic should live elsewhere in somewhere more common
1462-
func (s *server) ownsTwitterUsername(ctx context.Context, owner *common.Account, username string) (bool, error) {
1462+
func (s *Server) ownsTwitterUsername(ctx context.Context, owner *common.Account, username string) (bool, error) {
14631463
ownerTipAccount, err := owner.ToTimelockVault(timelock_token.DataVersion1, common.KinMintAccount)
14641464
if err != nil {
14651465
return false, errors.Wrap(err, "error deriving twitter tip address")
@@ -1478,7 +1478,7 @@ func (s *server) ownsTwitterUsername(ctx context.Context, owner *common.Account,
14781478
}
14791479

14801480
// todo: This logic should live elsewhere in somewhere more common
1481-
func (s *server) getOwnedTwitterUsername(ctx context.Context, owner *common.Account) (string, bool, error) {
1481+
func (s *Server) getOwnedTwitterUsername(ctx context.Context, owner *common.Account) (string, bool, error) {
14821482
ownerTipAccount, err := owner.ToTimelockVault(timelock_token.DataVersion1, common.KinMintAccount)
14831483
if err != nil {
14841484
return "", false, errors.Wrap(err, "error deriving twitter tip address")

pkg/code/server/grpc/chat/v2/stream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ type chatEventNotification struct {
9898
ts time.Time
9999
}
100100

101-
func (s *server) asyncNotifyAll(chatId chat.ChatId, event *chatpb.ChatStreamEvent) error {
101+
func (s *Server) asyncNotifyAll(chatId chat.ChatId, event *chatpb.ChatStreamEvent) error {
102102
m := proto.Clone(event).(*chatpb.ChatStreamEvent)
103103
ok := s.chatEventChans.Send(chatId[:], &chatEventNotification{chatId, m, time.Now()})
104104
if !ok {
@@ -107,7 +107,7 @@ func (s *server) asyncNotifyAll(chatId chat.ChatId, event *chatpb.ChatStreamEven
107107
return nil
108108
}
109109

110-
func (s *server) asyncChatEventStreamNotifier(workerId int, channel <-chan interface{}) {
110+
func (s *Server) asyncChatEventStreamNotifier(workerId int, channel <-chan interface{}) {
111111
log := s.log.WithFields(logrus.Fields{
112112
"method": "asyncChatEventStreamNotifier",
113113
"worker": workerId,

0 commit comments

Comments
 (0)