From 50e1dad2556549063734ba396abccbcdab3929f9 Mon Sep 17 00:00:00 2001 From: Ryan007 Date: Mon, 3 Nov 2025 00:26:59 +0800 Subject: [PATCH 1/3] fix repli panic --- .../replication_correlation.go | 307 ++++++++++-------- 1 file changed, 163 insertions(+), 144 deletions(-) diff --git a/pipeline/replication_correlation/replication_correlation.go b/pipeline/replication_correlation/replication_correlation.go index 31441854..35af514f 100644 --- a/pipeline/replication_correlation/replication_correlation.go +++ b/pipeline/replication_correlation/replication_correlation.go @@ -29,6 +29,11 @@ package replication_correlation import ( "fmt" + "runtime" + "strings" + "sync" + "time" + log "github.com/cihub/seelog" "infini.sh/framework/core/config" "infini.sh/framework/core/global" @@ -38,10 +43,6 @@ import ( "infini.sh/framework/core/stats" "infini.sh/framework/core/util" "infini.sh/framework/lib/fasthttp" - "runtime" - "strings" - "sync" - "time" ) type Config struct { @@ -336,184 +337,196 @@ func (processor *ReplicationCorrectionGroup) process(ctx *pipeline.Context, w *s wg.Add(1) finalCommitqConfig, finalCommitConsumerConfig, finalCommitLogConsumer := processor.getConsumer(processor.FinalStageQueueName) - defer queue.ReleaseConsumer(finalCommitqConfig, finalCommitConsumerConfig, finalCommitLogConsumer) - //check second stage commit - go processor.fetchMessages(ctx, "final", finalCommitLogConsumer, func(consumer queue.ConsumerAPI, messages []queue.Message) bool { + // 🔧 修复:检查consumer是否为nil(空队列情况) + if finalCommitLogConsumer == nil { + log.Debugf("final commit log queue is empty, skip final stage processing") + wg.Done() + } else { + defer queue.ReleaseConsumer(finalCommitqConfig, finalCommitConsumerConfig, finalCommitLogConsumer) + //check second stage commit + go processor.fetchMessages(ctx, "final", finalCommitLogConsumer, func(consumer queue.ConsumerAPI, messages []queue.Message) bool { - processor.lastTimestampFetchedAnyMessageInFinalStage = time.Now() + processor.lastTimestampFetchedAnyMessageInFinalStage = time.Now() - for _, message := range messages { + for _, message := range messages { - //commit log less then wal log more than 60s, then it should be safety to commit - if processor.commitable(message) { - processor.commitableMessageOffsetInFinalStage = message.NextOffset - } + //commit log less then wal log more than 60s, then it should be safety to commit + if processor.commitable(message) { + processor.commitableMessageOffsetInFinalStage = message.NextOffset + } - processor.totalMessageProcessedInFinalStage++ - timestampMessageFetchedInFinalStage = message.Timestamp + processor.totalMessageProcessedInFinalStage++ + timestampMessageFetchedInFinalStage = message.Timestamp - processor.lastOffsetInFinalStage = message.NextOffset.Position + processor.lastOffsetInFinalStage = message.NextOffset.Position - v := string(message.Data) - id, offset, timestamp := parseIDAndOffset(v) - processor.finalStageRecords.Store(id, MessageRecord{MessageOffset: message.NextOffset, RecordOffset: offset, RecordTimestamp: timestamp}) - } - log.Debugf("final stage message count: %v, map:%v", processor.totalMessageProcessedInFinalStage, util.GetSyncMapSize(&processor.finalStageRecords)) - return true - }, &wg) + v := string(message.Data) + id, offset, timestamp := parseIDAndOffset(v) + processor.finalStageRecords.Store(id, MessageRecord{MessageOffset: message.NextOffset, RecordOffset: offset, RecordTimestamp: timestamp}) + } + log.Debugf("final stage message count: %v, map:%v", processor.totalMessageProcessedInFinalStage, util.GetSyncMapSize(&processor.finalStageRecords)) + return true + }, &wg) + } // 🔧 修复:关闭 if finalCommitLogConsumer != nil 的代码块 time.Sleep(10 * time.Second) wg.Add(1) walCommitqConfig, walCommitConsumerConfig, WALConsumer := processor.getConsumer(processor.PreStageQueueName) - defer queue.ReleaseConsumer(walCommitqConfig, walCommitConsumerConfig, WALConsumer) - //fetch the message from the wal queue - go processor.fetchMessages(ctx, "wal", WALConsumer, func(consumer queue.ConsumerAPI, messages []queue.Message) bool { - processor.lastMessageFetchedTimeInPrepareStage = time.Now() - var lastCommitableMessageOffset queue.Offset - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - if !util.ContainStr(v, "owning this topic") { - log.Errorf("error in processor [%v], [%v]", processor.Name(), v) + // 🔧 修复:检查consumer是否为nil(空队列情况) + if WALConsumer == nil { + log.Debugf("WAL queue is empty, skip WAL stage processing") + wg.Done() + } else { + defer queue.ReleaseConsumer(walCommitqConfig, walCommitConsumerConfig, WALConsumer) + //fetch the message from the wal queue + go processor.fetchMessages(ctx, "wal", WALConsumer, func(consumer queue.ConsumerAPI, messages []queue.Message) bool { + processor.lastMessageFetchedTimeInPrepareStage = time.Now() + var lastCommitableMessageOffset queue.Offset + defer func() { + if !global.Env().IsDebug { + if r := recover(); r != nil { + var v string + switch r.(type) { + case error: + v = r.(error).Error() + case runtime.Error: + v = r.(runtime.Error).Error() + case string: + v = r.(string) + } + if !util.ContainStr(v, "owning this topic") { + log.Errorf("error in processor [%v], [%v]", processor.Name(), v) + } } } - } - - if WALConsumer != nil && lastCommitableMessageOffset.Position > 0 { - WALConsumer.CommitOffset(lastCommitableMessageOffset) - } - - }() - - for _, message := range messages { - processor.totalMessageProcessedInPrepareStage++ - processor.lastOffsetInPrepareStage = message.NextOffset.Position - if global.ShuttingDown() { - return false - } - - req := defaultHTTPPool.AcquireRequest() - err := req.Decode(message.Data) - if err != nil { - panic(err) - } - idByte := req.Header.Peek("X-Replicated-ID") - if idByte == nil || len(idByte) == 0 { - panic("invalid id") - } - uID := string(idByte) - timeByte := req.Header.Peek("X-Replicated-Timestamp") - if timeByte == nil || len(timeByte) == 0 { - panic("invalid timestamp") - } + if WALConsumer != nil && lastCommitableMessageOffset.Position > 0 { + WALConsumer.CommitOffset(lastCommitableMessageOffset) + } - tn, err := util.ToInt64(string(timeByte)) - if err != nil { - panic(err) - } - //timestamp:= util.FromUnixTimestamp(tn) - processor.latestRecordTimestampInPrepareStage = tn + }() - defaultHTTPPool.ReleaseRequest(req) + for _, message := range messages { + processor.totalMessageProcessedInPrepareStage++ + processor.lastOffsetInPrepareStage = message.NextOffset.Position + if global.ShuttingDown() { + return false + } - //update commit offset - lastCommitableMessageOffset = message.NextOffset - processor.commitableTimestampInPrepareStage = message.Timestamp - processor.config.SafetyCommitIntervalInSeconds + req := defaultHTTPPool.AcquireRequest() + err := req.Decode(message.Data) + if err != nil { + panic(err) + } + idByte := req.Header.Peek("X-Replicated-ID") + if idByte == nil || len(idByte) == 0 { + panic("invalid id") + } + uID := string(idByte) - retry_times := 0 + timeByte := req.Header.Peek("X-Replicated-Timestamp") + if timeByte == nil || len(timeByte) == 0 { + panic("invalid timestamp") + } - RETRY: + tn, err := util.ToInt64(string(timeByte)) + if err != nil { + panic(err) + } + //timestamp:= util.FromUnixTimestamp(tn) + processor.latestRecordTimestampInPrepareStage = tn - if global.ShuttingDown() { - return false - } + defaultHTTPPool.ReleaseRequest(req) - if time.Since(processor.lastMessageFetchedTimeInAnyStage) > time.Second*time.Duration(processor.config.SafetyCommitIntervalInSeconds) { - return false - } + //update commit offset + lastCommitableMessageOffset = message.NextOffset + processor.commitableTimestampInPrepareStage = message.Timestamp - processor.config.SafetyCommitIntervalInSeconds - //check final stage - if _, ok := processor.finalStageRecords.Load(uID); ok { - //valid request, cleanup - processor.cleanup(uID) - processor.totalFinishedMessage++ - } else { - hit := false - if (processor.lastOffsetInFinalStage - processor.lastOffsetInPrepareStage) > processor.config.SafetyCommitOffsetPaddingSize { - stats.Increment("replication_crc", "safe_commit_offset_padding_size") - hit = true - } + retry_times := 0 - if retry_times > processor.config.SafetyCommitRetryTimes { - stats.Increment("replication_crc", fmt.Sprintf("retry_times_exceed_%v", processor.config.SafetyCommitRetryTimes)) - hit = true - } + RETRY: - if time.Since(processor.lastTimestampFetchedAnyMessageInFinalStage) > time.Second*time.Duration(processor.config.SafetyCommitIntervalInSeconds) && (processor.lastTimestampFetchedAnyMessageInFinalStage.Unix()-message.Timestamp > processor.config.SafetyCommitIntervalInSeconds) { - stats.Increment("replication_crc", "no_message_fetched_in_final_stage_more_than_safety_interval") - //log.Error("no message fetched in final stage more than 120s, ", time.Since(processor.lastTimestampFetchedAnyMessageInFinalStage)) - hit = true + if global.ShuttingDown() { + return false } - if timestampMessageFetchedInFinalStage > 0 && (timestampMessageFetchedInFinalStage-processor.config.SafetyCommitIntervalInSeconds) > message.Timestamp { - stats.Increment("replication_crc", "message_fetched_in_final_stage_more_than_safety_interval") - hit = true + if time.Since(processor.lastMessageFetchedTimeInAnyStage) > time.Second*time.Duration(processor.config.SafetyCommitIntervalInSeconds) { + return false } - //if last commit time is more than 30 seconds, compare to wal or now, then this message maybe lost - if hit { //too long no message returned, maybe finished + //check final stage + if _, ok := processor.finalStageRecords.Load(uID); ok { + //valid request, cleanup + processor.cleanup(uID) + processor.totalFinishedMessage++ + } else { + hit := false + if (processor.lastOffsetInFinalStage - processor.lastOffsetInPrepareStage) > processor.config.SafetyCommitOffsetPaddingSize { + stats.Increment("replication_crc", "safe_commit_offset_padding_size") + hit = true + } - var errLog string - //check if in first stage - if _, ok := processor.firstStageRecords.Load(uID); ok { - errLog = fmt.Sprintf("request %v, offset: %v, %v in first stage but not in final stage", uID, message.NextOffset, message.Timestamp) - } else { - errLog = fmt.Sprintf("request %v, offset: %v, %v exists in wal but not in any stage", uID, message.NextOffset, message.Timestamp) + if retry_times > processor.config.SafetyCommitRetryTimes { + stats.Increment("replication_crc", fmt.Sprintf("retry_times_exceed_%v", processor.config.SafetyCommitRetryTimes)) + hit = true } - err := queue.Push(queue.GetOrInitConfig("replicate_failure_log"), []byte(errLog)) - if err != nil { - panic(err) + if time.Since(processor.lastTimestampFetchedAnyMessageInFinalStage) > time.Second*time.Duration(processor.config.SafetyCommitIntervalInSeconds) && (processor.lastTimestampFetchedAnyMessageInFinalStage.Unix()-message.Timestamp > processor.config.SafetyCommitIntervalInSeconds) { + stats.Increment("replication_crc", "no_message_fetched_in_final_stage_more_than_safety_interval") + //log.Error("no message fetched in final stage more than 120s, ", time.Since(processor.lastTimestampFetchedAnyMessageInFinalStage)) + hit = true } - err = queue.Push(queue.GetOrInitConfig("primary-failure"), message.Data) - if err != nil { - panic(err) + if timestampMessageFetchedInFinalStage > 0 && (timestampMessageFetchedInFinalStage-processor.config.SafetyCommitIntervalInSeconds) > message.Timestamp { + stats.Increment("replication_crc", "message_fetched_in_final_stage_more_than_safety_interval") + hit = true } - processor.totalUnFinishedMessage++ - processor.cleanup(uID) + //if last commit time is more than 30 seconds, compare to wal or now, then this message maybe lost + if hit { //too long no message returned, maybe finished - retry_times = 0 - } else { - if global.Env().IsDebug { - log.Infof("request %v, offset: %v,"+ - " retry_times: %v, docs_in_first_stage: %v, docs_in_final_stage: %v, last_final_offset: %v, last_wal_offset: %v", - uID, message.NextOffset, - retry_times, util.GetSyncMapSize(&processor.firstStageRecords), util.GetSyncMapSize(&processor.finalStageRecords), processor.lastOffsetInFinalStage, processor.lastOffsetInPrepareStage) - } - time.Sleep(1 * time.Second) - retry_times++ + var errLog string + //check if in first stage + if _, ok := processor.firstStageRecords.Load(uID); ok { + errLog = fmt.Sprintf("request %v, offset: %v, %v in first stage but not in final stage", uID, message.NextOffset, message.Timestamp) + } else { + errLog = fmt.Sprintf("request %v, offset: %v, %v exists in wal but not in any stage", uID, message.NextOffset, message.Timestamp) + } - //retry - goto RETRY + err := queue.Push(queue.GetOrInitConfig("replicate_failure_log"), []byte(errLog)) + if err != nil { + panic(err) + } + + err = queue.Push(queue.GetOrInitConfig("primary-failure"), message.Data) + if err != nil { + panic(err) + } + + processor.totalUnFinishedMessage++ + processor.cleanup(uID) + + retry_times = 0 + } else { + if global.Env().IsDebug { + log.Infof("request %v, offset: %v,"+ + " retry_times: %v, docs_in_first_stage: %v, docs_in_final_stage: %v, last_final_offset: %v, last_wal_offset: %v", + uID, message.NextOffset, + retry_times, util.GetSyncMapSize(&processor.firstStageRecords), util.GetSyncMapSize(&processor.finalStageRecords), processor.lastOffsetInFinalStage, processor.lastOffsetInPrepareStage) + } + time.Sleep(1 * time.Second) + retry_times++ + + //retry + goto RETRY + } } - } - } - return true - }, &wg) + } + return true + }, &wg) + } // 🔧 修复:关闭 if WALConsumer != nil 的代码块 //commit first and final stage in side way wg.Add(1) @@ -677,7 +690,8 @@ func (processor *ReplicationCorrectionGroup) process(ctx *pipeline.Context, w *s // firstCommitLogConsumer.CommitOffset(processor.commitableMessageOffsetInFirstStage) //} - if processor.commitableMessageOffsetInFinalStage.Position > 0 { + // 🔧 修复:只在consumer不为nil时才commit + if finalCommitLogConsumer != nil && processor.commitableMessageOffsetInFinalStage.Position > 0 { finalCommitLogConsumer.CommitOffset(processor.commitableMessageOffsetInFinalStage) } } @@ -700,6 +714,11 @@ func (processor *ReplicationCorrectionGroup) getConsumer(queueName string) (*que consumer, err := queue.AcquireConsumer(qConfig, cConfig, "worker_id") if err != nil { + // 🔧 修复:优雅处理空队列情况,而不是panic + if strings.Contains(err.Error(), "empty queue") { + log.Debugf("queue [%v] is empty, return nil consumer", queueName) + return qConfig, cConfig, nil + } panic(err) } return qConfig, cConfig, consumer From d86034eb19e2825fd6acdf81181497003c8707f8 Mon Sep 17 00:00:00 2001 From: zhanghuaxun Date: Wed, 5 Nov 2025 21:52:19 +0800 Subject: [PATCH 2/3] fix dup --- .../replication_correlation.go | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/pipeline/replication_correlation/replication_correlation.go b/pipeline/replication_correlation/replication_correlation.go index 35af514f..044e2a77 100644 --- a/pipeline/replication_correlation/replication_correlation.go +++ b/pipeline/replication_correlation/replication_correlation.go @@ -216,9 +216,10 @@ Fetch: } func (processor *ReplicationCorrectionGroup) cleanup(uID interface{}) { - //processor.prepareStageBitmap.Delete(uID) - //processor.firstStageRecords.Delete(uID) - //processor.finalStageRecords.Delete(uID) + // 🔧 修复:启用 cleanup 逻辑,防止内存泄漏和重复处理 + // 删除已完成的消息记录,避免 sync.Map 无限增长 + processor.firstStageRecords.Delete(uID) + processor.finalStageRecords.Delete(uID) } var defaultHTTPPool = fasthttp.NewRequestResponsePool("replication_crc") @@ -664,13 +665,19 @@ func (processor *ReplicationCorrectionGroup) process(ctx *pipeline.Context, w *s } if needCommitFinalStage { - if time.Since(lastFinalCommit) > time.Second*10 { + // 🔧 修复:减少 offset 提交延迟从 10 秒到 3 秒,降低崩溃时数据重复的窗口期 + // 同时确保 commit 不会过于频繁影响性能 + if time.Since(lastFinalCommit) > time.Second*3 { log.Debug("committing final offset:", processor.commitableMessageOffsetInFinalStage) - finalCommitLogConsumer.CommitOffset(processor.commitableMessageOffsetInFinalStage) - lastFinalCommit = time.Now() - needCommitFinalStage = false - timegap1 := msgTime - lastFinalCommitAbleMessageRecord.recordTime - log.Trace(x.RecordTimestamp, ",", x.RecordOffset, ",time_gap: ", timegap, "s, ", timegap1, "s, record:", msgTime, " vs latest:", processor.latestRecordTimestampInPrepareStage, ", updating to commit:", x.MessageOffset, lastFinalCommitAbleMessageRecord, ",", processor.config.SafetyCommitIntervalInSeconds) + err := finalCommitLogConsumer.CommitOffset(processor.commitableMessageOffsetInFinalStage) + if err != nil { + log.Errorf("failed to commit final offset: %v, offset: %v", err, processor.commitableMessageOffsetInFinalStage) + } else { + lastFinalCommit = time.Now() + needCommitFinalStage = false + timegap1 := msgTime - lastFinalCommitAbleMessageRecord.recordTime + log.Trace(x.RecordTimestamp, ",", x.RecordOffset, ",time_gap: ", timegap, "s, ", timegap1, "s, record:", msgTime, " vs latest:", processor.latestRecordTimestampInPrepareStage, ", updating to commit:", x.MessageOffset, lastFinalCommitAbleMessageRecord, ",", processor.config.SafetyCommitIntervalInSeconds) + } } } return true From 0705d64cde93c42cade655b03824970616d9ebd7 Mon Sep 17 00:00:00 2001 From: zhanghuaxun Date: Sat, 8 Nov 2025 23:04:57 +0800 Subject: [PATCH 3/3] change cn to en --- .../replication_correlation.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pipeline/replication_correlation/replication_correlation.go b/pipeline/replication_correlation/replication_correlation.go index 044e2a77..33e99a42 100644 --- a/pipeline/replication_correlation/replication_correlation.go +++ b/pipeline/replication_correlation/replication_correlation.go @@ -216,8 +216,8 @@ Fetch: } func (processor *ReplicationCorrectionGroup) cleanup(uID interface{}) { - // 🔧 修复:启用 cleanup 逻辑,防止内存泄漏和重复处理 - // 删除已完成的消息记录,避免 sync.Map 无限增长 + // fix: enable cleanup logic, to prevent memory leaks and duplicate processing + // fix: delete completed message records, to avoid sync.Map growing infinitely processor.firstStageRecords.Delete(uID) processor.finalStageRecords.Delete(uID) } @@ -338,7 +338,7 @@ func (processor *ReplicationCorrectionGroup) process(ctx *pipeline.Context, w *s wg.Add(1) finalCommitqConfig, finalCommitConsumerConfig, finalCommitLogConsumer := processor.getConsumer(processor.FinalStageQueueName) - // 🔧 修复:检查consumer是否为nil(空队列情况) + // fix: check if consumer is nil (empty queue case) if finalCommitLogConsumer == nil { log.Debugf("final commit log queue is empty, skip final stage processing") wg.Done() @@ -665,9 +665,9 @@ func (processor *ReplicationCorrectionGroup) process(ctx *pipeline.Context, w *s } if needCommitFinalStage { - // 🔧 修复:减少 offset 提交延迟从 10 秒到 3 秒,降低崩溃时数据重复的窗口期 - // 同时确保 commit 不会过于频繁影响性能 - if time.Since(lastFinalCommit) > time.Second*3 { + // fix: reduce offset commit delay from 10 seconds to 5 seconds, to reduce the window period of data duplication when crashing + // also ensure commit will not affect performance too much + if time.Since(lastFinalCommit) > time.Second*5 { log.Debug("committing final offset:", processor.commitableMessageOffsetInFinalStage) err := finalCommitLogConsumer.CommitOffset(processor.commitableMessageOffsetInFinalStage) if err != nil { @@ -697,7 +697,7 @@ func (processor *ReplicationCorrectionGroup) process(ctx *pipeline.Context, w *s // firstCommitLogConsumer.CommitOffset(processor.commitableMessageOffsetInFirstStage) //} - // 🔧 修复:只在consumer不为nil时才commit + // fix: only commit when consumer is not nil if finalCommitLogConsumer != nil && processor.commitableMessageOffsetInFinalStage.Position > 0 { finalCommitLogConsumer.CommitOffset(processor.commitableMessageOffsetInFinalStage) } @@ -721,7 +721,7 @@ func (processor *ReplicationCorrectionGroup) getConsumer(queueName string) (*que consumer, err := queue.AcquireConsumer(qConfig, cConfig, "worker_id") if err != nil { - // 🔧 修复:优雅处理空队列情况,而不是panic + // fix: error can be "empty queue" or "queue not exists" or other errors if strings.Contains(err.Error(), "empty queue") { log.Debugf("queue [%v] is empty, return nil consumer", queueName) return qConfig, cConfig, nil