Skip to content

Commit 0dd205a

Browse files
committed
chat/v2: remove tip hooks, sketch out latest rpcs
1 parent f751569 commit 0dd205a

File tree

8 files changed

+226
-282
lines changed

8 files changed

+226
-282
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ require (
4242
golang.org/x/crypto v0.21.0
4343
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
4444
golang.org/x/net v0.22.0
45+
golang.org/x/sync v0.7.0
4546
golang.org/x/text v0.14.0
4647
golang.org/x/time v0.5.0
4748
google.golang.org/api v0.170.0
@@ -120,7 +121,6 @@ require (
120121
go.uber.org/atomic v1.7.0 // indirect
121122
go.uber.org/multierr v1.6.0 // indirect
122123
golang.org/x/oauth2 v0.18.0 // indirect
123-
golang.org/x/sync v0.7.0 // indirect
124124
golang.org/x/sys v0.18.0 // indirect
125125
google.golang.org/appengine v1.6.8 // indirect
126126
google.golang.org/appengine/v2 v2.0.1 // indirect

pkg/code/chat/message_tips.go

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,12 @@ package chat
22

33
import (
44
"context"
5-
"fmt"
6-
7-
"github.com/mr-tron/base58"
8-
"github.com/pkg/errors"
9-
"github.com/sirupsen/logrus"
10-
"google.golang.org/protobuf/types/known/timestamppb"
11-
125
chatpb "github.com/code-payments/code-protobuf-api/generated/go/chat/v1"
13-
chatv2pb "github.com/code-payments/code-protobuf-api/generated/go/chat/v2"
14-
commonpb "github.com/code-payments/code-protobuf-api/generated/go/common/v1"
15-
166
"github.com/code-payments/code-server/pkg/code/common"
177
code_data "github.com/code-payments/code-server/pkg/code/data"
188
chat_v1 "github.com/code-payments/code-server/pkg/code/data/chat/v1"
19-
chat_v2 "github.com/code-payments/code-server/pkg/code/data/chat/v2"
209
"github.com/code-payments/code-server/pkg/code/data/intent"
10+
"github.com/pkg/errors"
2111
)
2212

2313
// SendTipsExchangeMessage sends a message to the Tips chat with exchange data
@@ -26,11 +16,6 @@ import (
2616
//
2717
// Note: Tests covered in SubmitIntent history tests
2818
func SendTipsExchangeMessage(ctx context.Context, data code_data.Provider, notifier Notifier, intentRecord *intent.Record) ([]*MessageWithOwner, error) {
29-
intentIdRaw, err := base58.Decode(intentRecord.IntentId)
30-
if err != nil {
31-
return nil, fmt.Errorf("invalid intent id: %w", err)
32-
}
33-
3419
messageId := intentRecord.IntentId
3520

3621
exchangeData, ok := getExchangeDataFromIntent(intentRecord)
@@ -78,26 +63,6 @@ func SendTipsExchangeMessage(ctx context.Context, data code_data.Provider, notif
7863
return nil, errors.Wrap(err, "error creating proto chat message")
7964
}
8065

81-
v2Message := &chatv2pb.ChatMessage{
82-
MessageId: chat_v2.GenerateMessageId().ToProto(),
83-
Content: []*chatv2pb.Content{
84-
{
85-
Type: &chatv2pb.Content_ExchangeData{
86-
ExchangeData: &chatv2pb.ExchangeDataContent{
87-
Verb: chatv2pb.ExchangeDataContent_Verb(verb),
88-
ExchangeData: &chatv2pb.ExchangeDataContent_Exact{
89-
Exact: exchangeData,
90-
},
91-
Reference: &chatv2pb.ExchangeDataContent_Intent{
92-
Intent: &commonpb.IntentId{Value: intentIdRaw},
93-
},
94-
},
95-
},
96-
},
97-
},
98-
Ts: timestamppb.New(intentRecord.CreatedAt),
99-
}
100-
10166
canPush, err := SendNotificationChatMessageV1(
10267
ctx,
10368
data,
@@ -112,23 +77,6 @@ func SendTipsExchangeMessage(ctx context.Context, data code_data.Provider, notif
11277
return nil, errors.Wrap(err, "error persisting v1 chat message")
11378
}
11479

115-
_, err = SendNotificationChatMessageV2(
116-
ctx,
117-
data,
118-
notifier,
119-
TipsName,
120-
true,
121-
receiver,
122-
v2Message,
123-
intentRecord.IntentId,
124-
verb != chatpb.ExchangeDataContent_RECEIVED_TIP,
125-
)
126-
if err != nil {
127-
// TODO: Eventually we'll want to return an error, but for now we'll log
128-
// since we're not in 'prod' yet.
129-
logrus.StandardLogger().WithError(err).Warn("Failed to send notification message (v2)")
130-
}
131-
13280
if canPush {
13381
messagesToPush = append(messagesToPush, &MessageWithOwner{
13482
Owner: receiver,

pkg/code/data/chat/v2/memory/store.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,13 @@ func (s *store) GetAllMembersByPlatformIds(_ context.Context, idByPlatform map[c
100100
}
101101

102102
// GetUnreadCount implements chat.store.GetUnreadCount
103-
func (s *store) GetUnreadCount(_ context.Context, chatId chat.ChatId, readPointer chat.MessageId) (uint32, error) {
103+
func (s *store) GetUnreadCount(_ context.Context, chatId chat.ChatId, memberId chat.MemberId, readPointer chat.MessageId) (uint32, error) {
104104
s.mu.Lock()
105105
defer s.mu.Unlock()
106106

107107
items := s.findMessagesByChatId(chatId)
108108
items = s.filterMessagesAfter(items, readPointer)
109+
items = s.filterMessagesNotSentBy(items, memberId)
109110
items = s.filterNotifiedMessages(items)
110111
return uint32(len(items)), nil
111112
}
@@ -446,6 +447,16 @@ func (s *store) filterMessagesAfter(items []*chat.MessageRecord, pointer chat.Me
446447
return res
447448
}
448449

450+
func (s *store) filterMessagesNotSentBy(items []*chat.MessageRecord, sender chat.MemberId) []*chat.MessageRecord {
451+
var res []*chat.MessageRecord
452+
for _, item := range items {
453+
if item.Sender == nil || !bytes.Equal(item.Sender[:], sender[:]) {
454+
res = append(res, item)
455+
}
456+
}
457+
return res
458+
}
459+
449460
func (s *store) filterNotifiedMessages(items []*chat.MessageRecord) []*chat.MessageRecord {
450461
var res []*chat.MessageRecord
451462
for _, item := range items {

pkg/code/data/chat/v2/store.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"errors"
66

7+
"github.com/code-payments/code-protobuf-api/generated/go/common/v1"
8+
79
"github.com/code-payments/code-server/pkg/database/query"
810
)
911

@@ -43,7 +45,7 @@ type Store interface {
4345
GetAllMessagesByChatId(ctx context.Context, chatId ChatId, cursor query.Cursor, direction query.Ordering, limit uint64) ([]*MessageRecord, error)
4446

4547
// GetUnreadCount gets the unread message count for a chat ID at a read pointer
46-
GetUnreadCount(ctx context.Context, chatId ChatId, readPointer MessageId) (uint32, error)
48+
GetUnreadCount(ctx context.Context, chatId ChatId, memberId MemberId, readPointer MessageId) (uint32, error)
4749

4850
// PutChat creates a new chat
4951
PutChat(ctx context.Context, record *ChatRecord) error
@@ -66,3 +68,23 @@ type Store interface {
6668
// SetSubscriptionState updates the subscription state for a chat member
6769
SetSubscriptionState(ctx context.Context, chatId ChatId, memberId MemberId, isSubscribed bool) error
6870
}
71+
72+
type PaymentStore interface {
73+
// MarkFriendshipPaid marks a friendship as paid.
74+
//
75+
// The intentId is the intent that paid for the friendship.
76+
MarkFriendshipPaid(ctx context.Context, payer, other *common.SolanaAccountId, intentId *common.IntentId) error
77+
78+
// IsFriendshipPaid returns whether a payment has been made for a friendship.
79+
//
80+
// IsFriendshipPaid is reflexive, with only a single payment being required.
81+
IsFriendshipPaid(ctx context.Context, user, other *common.SolanaAccountId) (bool, error)
82+
83+
// MarkChatPaid marks a chat as paid.
84+
MarkChatPaid(ctx context.Context, payer *common.SolanaAccountId, chat ChatId) error
85+
86+
// IsChatPaid returns whether a member paid to be part of a chat.
87+
//
88+
// This is only valid for non-two way chats.
89+
IsChatPaid(ctx context.Context, chatId ChatId, member *common.SolanaAccountId) (bool, error)
90+
}

pkg/code/data/internal.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ import (
7171
intent_memory_client "github.com/code-payments/code-server/pkg/code/data/intent/memory"
7272
login_memory_client "github.com/code-payments/code-server/pkg/code/data/login/memory"
7373
merkletree_memory_client "github.com/code-payments/code-server/pkg/code/data/merkletree/memory"
74-
messaging "github.com/code-payments/code-server/pkg/code/data/messaging"
74+
"github.com/code-payments/code-server/pkg/code/data/messaging"
7575
messaging_memory_client "github.com/code-payments/code-server/pkg/code/data/messaging/memory"
7676
nonce_memory_client "github.com/code-payments/code-server/pkg/code/data/nonce/memory"
7777
onramp_memory_client "github.com/code-payments/code-server/pkg/code/data/onramp/memory"
@@ -402,7 +402,7 @@ type DatabaseData interface {
402402
GetAllChatMembersV2(ctx context.Context, chatId chat_v2.ChatId) ([]*chat_v2.MemberRecord, error)
403403
GetPlatformUserChatMembershipV2(ctx context.Context, idByPlatform map[chat_v2.Platform]string, opts ...query.Option) ([]*chat_v2.MemberRecord, error)
404404
GetAllChatMessagesV2(ctx context.Context, chatId chat_v2.ChatId, opts ...query.Option) ([]*chat_v2.MessageRecord, error)
405-
GetChatUnreadCountV2(ctx context.Context, chatId chat_v2.ChatId, readPointer chat_v2.MessageId) (uint32, error)
405+
GetChatUnreadCountV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, readPointer chat_v2.MessageId) (uint32, error)
406406
PutChatV2(ctx context.Context, record *chat_v2.ChatRecord) error
407407
PutChatMemberV2(ctx context.Context, record *chat_v2.MemberRecord) error
408408
PutChatMessageV2(ctx context.Context, record *chat_v2.MessageRecord) error
@@ -1493,8 +1493,8 @@ func (dp *DatabaseProvider) GetAllChatMessagesV2(ctx context.Context, chatId cha
14931493
}
14941494
return dp.chatv2.GetAllMessagesByChatId(ctx, chatId, req.Cursor, req.SortBy, req.Limit)
14951495
}
1496-
func (dp *DatabaseProvider) GetChatUnreadCountV2(ctx context.Context, chatId chat_v2.ChatId, readPointer chat_v2.MessageId) (uint32, error) {
1497-
return dp.chatv2.GetUnreadCount(ctx, chatId, readPointer)
1496+
func (dp *DatabaseProvider) GetChatUnreadCountV2(ctx context.Context, chatId chat_v2.ChatId, memberId chat_v2.MemberId, readPointer chat_v2.MessageId) (uint32, error) {
1497+
return dp.chatv2.GetUnreadCount(ctx, chatId, memberId, readPointer)
14981498
}
14991499
func (dp *DatabaseProvider) PutChatV2(ctx context.Context, record *chat_v2.ChatRecord) error {
15001500
return dp.chatv2.PutChat(ctx, record)

pkg/code/server/grpc/chat/v1/server.go

Lines changed: 72 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,18 @@ type server struct {
5454
chatLocks *sync_util.StripedLock
5555
chatEventChans *sync_util.StripedChannel
5656

57-
streamsMu sync.RWMutex
58-
streams map[string]*chatEventStream
59-
6057
chatpb.UnimplementedChatServer
6158
}
6259

63-
func NewChatServer(data code_data.Provider, auth *auth_util.RPCSignatureVerifier) chatpb.ChatServer {
64-
return &server{
65-
log: logrus.StandardLogger().WithField("type", "chat/server"),
66-
data: data,
67-
auth: auth,
68-
streams: make(map[string]*chatEventStream),
60+
func NewChatServer(data code_data.Provider, auth *auth_util.RPCSignatureVerifier, pusher push_lib.Provider) chatpb.ChatServer {
61+
s := &server{
62+
log: logrus.StandardLogger().WithField("type", "chat/v1/server"),
63+
data: data,
64+
auth: auth,
65+
pusher: pusher,
66+
streams: make(map[string]*chatEventStream),
67+
chatLocks: sync_util.NewStripedLock(64), // todo: configurable parameters
68+
chatEventChans: sync_util.NewStripedChannel(64, 100_000), // todo: configurable parameters
6969
}
7070

7171
for i, channel := range s.chatEventChans.GetChannels() {
@@ -403,21 +403,9 @@ func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerR
403403
Pointers: []*chatpb.Pointer{req.Pointer},
404404
}
405405

406-
s.streamsMu.RLock()
407-
for key, stream := range s.streams {
408-
if !strings.HasPrefix(key, chatId.String()) {
409-
continue
410-
}
411-
412-
if strings.HasSuffix(key, owner.PublicKey().ToBase58()) {
413-
continue
414-
}
415-
416-
if err := stream.notify(event, streamNotifyTimeout); err != nil {
417-
log.WithError(err).Warnf("failed to notify session stream, closing streamer (stream=%p)", stream)
418-
}
406+
if err := s.asyncNotifyAll(chatId, owner, event); err != nil {
407+
log.WithError(err).Warn("failure notifying chat event")
419408
}
420-
s.streamsMu.RUnlock()
421409

422410
return &chatpb.AdvancePointerResponse{
423411
Result: chatpb.AdvancePointerResponse_OK,
@@ -428,7 +416,7 @@ func (s *server) AdvancePointer(ctx context.Context, req *chatpb.AdvancePointerR
428416
return nil, status.Error(codes.InvalidArgument, "Pointer.Kind must be READ")
429417
}
430418

431-
chatRecord, err := s.data.GetChatById(ctx, chatId)
419+
chatRecord, err := s.data.GetChatByIdV1(ctx, chatId)
432420
if err == chat.ErrChatNotFound {
433421
return &chatpb.AdvancePointerResponse{
434422
Result: chatpb.AdvancePointerResponse_CHAT_NOT_FOUND,
@@ -656,6 +644,22 @@ func (s *server) StreamChatEvents(streamer chatpb.Chat_StreamChatEventsServer) e
656644

657645
s.streamsMu.Unlock()
658646

647+
defer func() {
648+
s.streamsMu.Lock()
649+
650+
log.Tracef("closing streamer (stream=%s)", streamRef)
651+
652+
// We check to see if the current active stream is the one that we created.
653+
// If it is, we can just remove it since it's closed. Otherwise, we leave it
654+
// be, as another OpenMessageStream() call is handling it.
655+
liveStream, exists := s.streams[streamKey]
656+
if exists && liveStream == stream {
657+
delete(s.streams, streamKey)
658+
}
659+
660+
s.streamsMu.Unlock()
661+
}()
662+
659663
sendPingCh := time.After(0)
660664
streamHealthCh := monitorChatEventStreamHealth(ctx, log, streamRef, streamer)
661665

@@ -734,6 +738,10 @@ func (s *server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest
734738
return nil, status.Error(codes.InvalidArgument, "content[0] must be Text or ThankYou")
735739
}
736740

741+
chatLock := s.chatLocks.Get(chatId[:])
742+
chatLock.Lock()
743+
defer chatLock.Unlock()
744+
737745
// todo: Revisit message IDs
738746
messageId, err := common.NewRandomAccount()
739747
if err != nil {
@@ -749,28 +757,54 @@ func (s *server) SendMessage(ctx context.Context, req *chatpb.SendMessageRequest
749757
Cursor: nil, // todo: Don't have cursor until we save it to the DB
750758
}
751759

760+
// todo: Save the message to the DB
761+
752762
event := &chatpb.ChatStreamEvent{
753763
Messages: []*chatpb.ChatMessage{chatMessage},
754764
}
755765

756-
s.streamsMu.RLock()
757-
for key, stream := range s.streams {
758-
if !strings.HasPrefix(key, chatId.String()) {
759-
continue
760-
}
761-
762-
if strings.HasSuffix(key, owner.PublicKey().ToBase58()) {
763-
continue
764-
}
765-
766-
if err := stream.notify(event, streamNotifyTimeout); err != nil {
767-
log.WithError(err).Warnf("failed to notify session stream, closing streamer (stream=%p)", stream)
768-
}
766+
if err := s.asyncNotifyAll(chatId, owner, event); err != nil {
767+
log.WithError(err).Warn("failure notifying chat event")
769768
}
770-
s.streamsMu.RUnlock()
769+
770+
s.asyncPushChatMessage(owner, chatId, chatMessage)
771771

772772
return &chatpb.SendMessageResponse{
773773
Result: chatpb.SendMessageResponse_OK,
774774
Message: chatMessage,
775775
}, nil
776776
}
777+
778+
// todo: doesn't respect mute/unsubscribe rules
779+
// todo: only sends pushes to active stream listeners instead of all message recipients
780+
func (s *server) asyncPushChatMessage(sender *common.Account, chatId chat.ChatId, chatMessage *chatpb.ChatMessage) {
781+
ctx := context.TODO()
782+
783+
go func() {
784+
s.streamsMu.RLock()
785+
for key := range s.streams {
786+
if !strings.HasPrefix(key, chatId.String()) {
787+
continue
788+
}
789+
790+
receiver, err := common.NewAccountFromPublicKeyString(strings.Split(key, ":")[1])
791+
if err != nil {
792+
continue
793+
}
794+
795+
if bytes.Equal(sender.PublicKey().ToBytes(), receiver.PublicKey().ToBytes()) {
796+
continue
797+
}
798+
799+
go push_util.SendChatMessagePushNotification(
800+
ctx,
801+
s.data,
802+
s.pusher,
803+
"TontonTwitch",
804+
receiver,
805+
chatMessage,
806+
)
807+
}
808+
s.streamsMu.RUnlock()
809+
}()
810+
}

0 commit comments

Comments
 (0)