Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 124 additions & 20 deletions golang/consumer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ import (
"time"
)

var (
messageGroupExtractor = func(mv *MessageView) string {
messageGroup := mv.GetMessageGroup()
if messageGroup != nil {
return *messageGroup
}
return ""
}

liteTopicExtractor = func(mv *MessageView) string {
return mv.GetLiteTopic()
}
)

type ConsumeService interface {
consume(ProcessQueue, []*MessageView)
consumeWithDuration(*MessageView, time.Duration, func(ConsumerResult, error))
Expand Down Expand Up @@ -93,14 +107,16 @@ func (bcs *baseConsumeService) newConsumeTask(clientId string, messageListener M
}()
duration := time.Since(startTime)
status := MessageHookPointsStatus_ERROR
if consumeResult == SUCCESS {
// Check if result is SUCCESS or SUSPEND (considered as successful)
if consumeResult.Type == ConsumerResultTypeSuccess || consumeResult.Type == ConsumerResultTypeSuspend {
status = MessageHookPointsStatus_OK
}
messageInterceptor.doAfter(MessageHookPoints_CONSUME, []*MessageCommon{messageView.GetMessageCommon()}, duration, status)
}
}

var _ = ConsumeService(&standardConsumeService{})
var _ = ConsumeService(&liteFifoConsumeService{})

type standardConsumeService struct {
baseConsumeService
Expand Down Expand Up @@ -133,37 +149,50 @@ func NewStandardConsumeService(clientId string, messageListener MessageListener,
}
}

func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews []*MessageView) {
if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
fcs.consumeIteratively(pq, &messageViews, 0)
return
}
// Group messages by messageGroup
messageViewsGroupByMessageGroup := make(map[string][]*MessageView)
messageViewsWithoutMessageGroup := make([]*MessageView, 0)
// groupMessageBy groups messages by applying the provided groupKeyExtractor function
// It returns two maps: grouped messages and messages without a group key
func groupMessageBy(messageViews []*MessageView, groupKeyExtractor func(*MessageView) string) (map[string][]*MessageView, []*MessageView) {
messageViewsGroupByGroupKey := make(map[string][]*MessageView)
messageViewsWithoutGroupKey := make([]*MessageView, 0)

for _, messageView := range messageViews {
messageGroup := messageView.GetMessageGroup()
if messageGroup != nil && *messageGroup != "" {
messageViewsGroupByMessageGroup[*messageGroup] = append(messageViewsGroupByMessageGroup[*messageGroup], messageView)
groupKey := groupKeyExtractor(messageView)
if groupKey != "" {
messageViewsGroupByGroupKey[groupKey] = append(messageViewsGroupByGroupKey[groupKey], messageView)
} else {
messageViewsWithoutMessageGroup = append(messageViewsWithoutMessageGroup, messageView)
messageViewsWithoutGroupKey = append(messageViewsWithoutGroupKey, messageView)
}
}

groupNum := len(messageViewsGroupByMessageGroup)
if len(messageViewsWithoutMessageGroup) > 0 {
groupNum := len(messageViewsGroupByGroupKey)
if len(messageViewsWithoutGroupKey) > 0 {
groupNum++
}
sugarBaseLogger.Debugf("FifoConsumeService parallel consume, messageViewsNum=%d, groupNum=%d", len(messageViews), groupNum)

return messageViewsGroupByGroupKey, messageViewsWithoutGroupKey
}

func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews []*MessageView) {
if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
fcs.consumeIteratively(pq, &messageViews, 0)
return
}

messageViewsGroupByGroupKey, messageViewsWithoutGroupKey := groupMessageBy(
messageViews,
messageGroupExtractor,
)

// Consume messages in parallel by group
for _, group := range messageViewsGroupByMessageGroup {
for _, group := range messageViewsGroupByGroupKey {
fcs.consumeIteratively(pq, &group, 0)
}
if len(messageViewsWithoutMessageGroup) > 0 {
fcs.consumeIteratively(pq, &messageViewsWithoutMessageGroup, 0)
if len(messageViewsWithoutGroupKey) > 0 {
fcs.consumeIteratively(pq, &messageViewsWithoutGroupKey, 0)
}
}

func (fcs *fifoConsumeService) consumeIteratively(pq ProcessQueue, messageViewsPtr *[]*MessageView, ptr int) {
if messageViewsPtr == nil {
sugarBaseLogger.Errorf("[Bug] messageViews is nil when consumeIteratively")
Expand Down Expand Up @@ -192,7 +221,82 @@ func (fcs *fifoConsumeService) consumeIteratively(pq ProcessQueue, messageViewsP

func NewFiFoConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor, enableFifoConsumeAccelerator bool) *fifoConsumeService {
return &fifoConsumeService{
baseConsumeService: *NewBaseConsumeService(clientId, messageListener, consumptionExecutor, messageInterceptor),
enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
baseConsumeService: *NewBaseConsumeService(clientId, messageListener, consumptionExecutor, messageInterceptor),
enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
}
}

// liteFifoConsumeService is a fifoConsumeService that used for lite push consumer
type liteFifoConsumeService struct {
baseConsumeService
enableFifoConsumeAccelerator bool
}

func (lcs *liteFifoConsumeService) consume(pq ProcessQueue, messageViews []*MessageView) {
if !lcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
lcs.consumeIteratively(pq, &messageViews, 0)
return
}

messageViewsGroupByGroupKey, messageViewsWithoutGroupKey := groupMessageBy(
messageViews,
liteTopicExtractor,
)

for _, group := range messageViewsGroupByGroupKey {
lcs.consumeIteratively(pq, &group, 0)
}
if len(messageViewsWithoutGroupKey) > 0 {
lcs.consumeIteratively(pq, &messageViewsWithoutGroupKey, 0)
}
}

func (lcs *liteFifoConsumeService) consumeIteratively(pq ProcessQueue, messageViewsPtr *[]*MessageView, ptr int) {
if messageViewsPtr == nil {
sugarBaseLogger.Errorf("[Bug] messageViews is nil when consumeIteratively")
return
}
messageViews := *messageViewsPtr
if ptr >= len(messageViews) {
return
}
mv := messageViews[ptr]
if mv.isCorrupted() {
sugarBaseLogger.Errorf("Message is corrupted for FIFO consumption, prepare to discard it, mq=%s, messageId=%s, clientId=%s", pq.getMessageQueue().String(), mv.GetMessageId(), lcs.clientId)
pq.discardFifoMessage(mv)
lcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
return
}
lcs.consumeImmediately(mv, func(result ConsumerResult, err error) {
if err != nil {
sugarBaseLogger.Errorf("[Bug] Exception raised in consumption callback, clientId=%s", lcs.clientId)
return
}
pq.eraseFifoMessage(mv, result)
if result.Type == ConsumerResultTypeSuspend {
// Suspend all messages with the same liteTopic in this batch
newMsgList := make([]*MessageView, 0)
for i := ptr + 1; i < len(messageViews); i++ {
msgView := messageViews[i]
if msgView.GetLiteTopic() == mv.GetLiteTopic() {
pq.eraseFifoMessage(msgView, result)
} else {
newMsgList = append(newMsgList, msgView)
}
}
// Continue processing remaining messages with different liteTopic
if len(newMsgList) > 0 {
lcs.consumeIteratively(pq, &newMsgList, 0)
}
} else {
lcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
}
})
}

func NewLiteFifoConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor, enableFifoConsumeAccelerator bool) *liteFifoConsumeService {
return &liteFifoConsumeService{
baseConsumeService: *NewBaseConsumeService(clientId, messageListener, consumptionExecutor, messageInterceptor),
enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
}
}
23 changes: 12 additions & 11 deletions golang/lite_push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,27 @@ func (lpc *defaultLitePushConsumer) notifyUnsubscribeLite(command *v2.NotifyUnsu
lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
}

func (lpc *defaultLitePushConsumer) SubscribeLite(topic string) error {
func (lpc *defaultLitePushConsumer) SubscribeLite(liteTopic string) error {
if err := lpc.checkRunning(); err != nil {
return err
}
if err := lpc.syncLiteSubscription(context.TODO(), v2.LiteSubscriptionAction_PARTIAL_ADD, []string{topic}); err != nil {
sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite topic:%s err:%v", topic, err)
if err := lpc.syncLiteSubscription(context.TODO(), v2.LiteSubscriptionAction_PARTIAL_ADD, []string{liteTopic}); err != nil {
sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite liteTopic:%s err:%v", liteTopic, err)
return err
}
lpc.litePushConsumerSettings.liteTopicSet.Store(topic, struct{}{})
lpc.litePushConsumerSettings.liteTopicSet.Store(liteTopic, struct{}{})
return nil
}

func (lpc *defaultLitePushConsumer) UnSubscribeLite(topic string) error {
func (lpc *defaultLitePushConsumer) UnSubscribeLite(liteTopic string) error {
if err := lpc.checkRunning(); err != nil {
return err
}
if err := lpc.syncLiteSubscription(context.TODO(), v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{topic}); err != nil {
sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite topic:%s err:%v", topic, err)
if err := lpc.syncLiteSubscription(context.TODO(), v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{liteTopic}); err != nil {
sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite liteTopic:%s err:%v", liteTopic, err)
return err
}
lpc.litePushConsumerSettings.liteTopicSet.Delete(topic)
lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
return nil
}

Expand All @@ -147,9 +147,10 @@ func (lpc *defaultLitePushConsumer) syncAllLiteSubscription() {
}
return true
})
if len(liteTopicSet) == 0 {
return
}
// Sync subscription even when liteTopicSet is empty to keep server state consistent
//if len(liteTopicSet) == 0 {
// return
//}
if err := lpc.syncLiteSubscription(context.TODO(), v2.LiteSubscriptionAction_COMPLETE_ADD, liteTopicSet); err != nil {
sugarBaseLogger.Errorf("LitePushConsumer syncAllLiteSubscription:%v, err:%v", liteTopicSet, err)
}
Expand Down
45 changes: 34 additions & 11 deletions golang/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,17 @@ func (dpq *defaultProcessQueue) discardFifoMessage(mv *MessageView) {
}

func (dpq *defaultProcessQueue) eraseFifoMessage(mv *MessageView, result ConsumerResult) {
result = dpq.convertSuspendResultIfNeeded(result)

retryPolicy := dpq.consumer.pcSettings.GetRetryPolicy()
maxAttempts := retryPolicy.MaxAttempts
attempt := mv.GetMessageCommon().deliveryAttempt
messageId := mv.GetMessageId()
service := dpq.consumer.consumerService
clientId := dpq.consumer.cli.clientID

if result == FAILURE && attempt < maxAttempts {
// Handle FAILURE result with retry
if result.Type == ConsumerResultTypeFailure && attempt < maxAttempts {
nextAttemptDelay := utils.GetNextAttemptDelay(retryPolicy, int(attempt))
mv.deliveryAttempt += 1
attempt = mv.deliveryAttempt
Expand All @@ -91,16 +94,36 @@ func (dpq *defaultProcessQueue) eraseFifoMessage(mv *MessageView, result Consume
return
}

if result != SUCCESS {
dpq.consumer.cli.log.Infof("Failed to consume fifo message finally, run out of attempt times, maxAttempts=%d, "+
"attempt=%d, mq=%s, messageId=%s, clientId=%s", maxAttempts, attempt, dpq.mqstr, messageId, clientId)
}
// Ack message or forward it to DLQ depends on consumption result.
if result == SUCCESS {
// Handle SUCCESS result
if result.Type == ConsumerResultTypeSuccess {
dpq.ackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
} else {
dpq.forwardToDeadLetterQueue(mv, func(error) { dpq.evictCacheMessage(mv) })
return
}

// Handle SUSPEND result
if result.Type == ConsumerResultTypeSuspend {
dpq.consumer.cli.log.Infof("Suspend consumption, consumerGroup=%s, topic=%s, liteTopic=%s, messageId=%s, suspendTime=%v",
dpq.consumer.groupName, mv.topic, mv.GetLiteTopic(), messageId, result.suspendTime)
dpq.changeInvisibleDuration(mv, result.suspendTime, 1, func(error) { dpq.evictCacheMessage(mv) })
return
}

// Handle FAILURE result without retry (final failure)
dpq.consumer.cli.log.Infof("Failed to consume fifo message finally, run out of attempt times, maxAttempts=%d, "+
"attempt=%d, mq=%s, messageId=%s, clientId=%s", maxAttempts, attempt, dpq.mqstr, messageId, clientId)
dpq.forwardToDeadLetterQueue(mv, func(error) { dpq.evictCacheMessage(mv) })
}

func (dpq *defaultProcessQueue) convertSuspendResultIfNeeded(result ConsumerResult) ConsumerResult {
if result.Type == ConsumerResultTypeSuspend {
if dpq.consumer.pcSettings.clientType != v2.ClientType_LITE_PUSH_CONSUMER {
dpq.consumer.cli.log.Warnf("Only LitePushConsumer supports ConsumeResultSuspend! "+
"Convert to FAILURE, consumerGroup=%s, consumerType=%v",
dpq.consumer.groupName, dpq.consumer.pcSettings.clientType)
return FAILURE
}
}
return result
}

func (dpq *defaultProcessQueue) forwardToDeadLetterQueue(mv *MessageView, callback func(error)) {
Expand Down Expand Up @@ -162,14 +185,14 @@ func (dpq *defaultProcessQueue) forwardToDeadLetterQueueLater(mv *MessageView, a
}

func (dpq *defaultProcessQueue) eraseMessage(mv *MessageView, consumeResult ConsumerResult) {
if consumeResult == SUCCESS {
consumeResult = dpq.convertSuspendResultIfNeeded(consumeResult)
if consumeResult.Type == ConsumerResultTypeSuccess || consumeResult.Type == ConsumerResultTypeSuspend {
dpq.consumer.consumptionOkQuantity.Inc()
dpq.ackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
} else {
dpq.consumer.consumptionErrorQuantity.Inc()
dpq.nackMessage(mv, func(error) { dpq.evictCacheMessage(mv) })
}

}

func (dpq *defaultProcessQueue) discardMessage(mv *MessageView) {
Expand Down
9 changes: 5 additions & 4 deletions golang/protocol/v2/admin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions golang/protocol/v2/admin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading