Skip to content

Commit e0f8feb

Browse files
committed
feat: add an ability to pause specific partitions
1 parent 0a6862a commit e0f8feb

19 files changed

+238
-41
lines changed

src/KafkaFlow.Abstractions/IConsumerContext.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
45

@@ -108,4 +109,14 @@ public interface IConsumerContext
108109
/// Resume Kafka's message fetch
109110
/// </summary>
110111
void Resume();
112+
113+
/// <summary>
114+
/// Pause Kafka's message fetch, from a specific partitions, buffered messages will still be processed
115+
/// </summary>
116+
void Pause(IReadOnlyList<TopicPartition> topicPartitions);
117+
118+
/// <summary>
119+
/// Resume Kafka's message fetch for specific partitions
120+
/// </summary>
121+
void Resume(IReadOnlyList<TopicPartition> topicPartitions);
111122
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
namespace KafkaFlow;
2+
3+
/// <summary>
4+
/// Represents a Kafka topic and partition pair.
5+
/// </summary>
6+
public readonly struct TopicPartition
7+
{
8+
/// <summary>
9+
/// Initializes a new instance of the <see cref="TopicPartition"/> struct with the specified topic name, partition number, and offset value.
10+
/// </summary>
11+
/// <param name="topic">The name of the topic.</param>
12+
/// <param name="partition">The id of the partition.</param>
13+
public TopicPartition(string topic, int partition)
14+
{
15+
Topic = topic;
16+
Partition = partition;
17+
}
18+
19+
/// <summary>
20+
/// Gets the name of the topic.
21+
/// </summary>
22+
public string Topic { get; }
23+
24+
/// <summary>
25+
/// Gets the id of the partition.
26+
/// </summary>
27+
public int Partition { get; }
28+
}

src/KafkaFlow.Admin/Extensions/MessageConsumerExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace KafkaFlow.Admin.Extensions;
77

88
internal static class MessageConsumerExtensions
99
{
10-
public static IReadOnlyList<TopicPartition> FilterAssigment(this IMessageConsumer consumer, IList<string> topics)
10+
public static IReadOnlyList<Confluent.Kafka.TopicPartition> FilterAssigment(this IMessageConsumer consumer, IList<string> topics)
1111
{
1212
return topics.Any() ? consumer.Assignment.Where(a => topics.Contains(a.Topic)).ToList() : consumer.Assignment;
1313
}

src/KafkaFlow/Clusters/ClusterManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public async Task<IEnumerable<TopicPartitionOffset>> GetConsumerGroupOffsetsAsyn
9292
topicsMetadata
9393
.SelectMany(
9494
topic => topic.Metadata.Partitions.Select(
95-
partition => new TopicPartition(
95+
partition => new Confluent.Kafka.TopicPartition(
9696
topic.Name,
9797
new Partition(partition.Id))))
9898
.ToList();

src/KafkaFlow/Consumers/Consumer.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal class Consumer : IConsumer
1717
private readonly ILogHandler _logHandler;
1818
private readonly bool _stopTheWorldStrategy;
1919

20-
private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<TopicPartition>>>
20+
private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartition>>>
2121
_partitionsAssignedHandlers = new();
2222

2323
private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>,
@@ -26,7 +26,7 @@ private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, Lis
2626

2727
private readonly List<Action<IConsumer<byte[], byte[]>, Error>> _errorsHandlers = new();
2828
private readonly List<Action<IConsumer<byte[], byte[]>, string>> _statisticsHandlers = new();
29-
private readonly ConcurrentDictionary<TopicPartition, long> _currentPartitionsOffsets = new();
29+
private readonly ConcurrentDictionary<Confluent.Kafka.TopicPartition, long> _currentPartitionsOffsets = new();
3030
private readonly ConsumerFlowManager _flowManager;
3131
private readonly Event _maxPollIntervalExceeded;
3232

@@ -71,7 +71,7 @@ public Consumer(
7171

7272
public IReadOnlyList<string> Subscription { get; private set; } = new List<string>();
7373

74-
public IReadOnlyList<TopicPartition> Assignment { get; private set; } = new List<TopicPartition>();
74+
public IReadOnlyList<Confluent.Kafka.TopicPartition> Assignment { get; private set; } = new List<Confluent.Kafka.TopicPartition>();
7575

7676
public IConsumerFlowManager FlowManager => _flowManager;
7777

@@ -101,7 +101,7 @@ public ConsumerStatus Status
101101
}
102102
}
103103

104-
public void OnPartitionsAssigned(Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<TopicPartition>> handler) =>
104+
public void OnPartitionsAssigned(Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartition>> handler) =>
105105
_partitionsAssignedHandlers.Add(handler);
106106

107107
public void OnPartitionsRevoked(
@@ -114,13 +114,13 @@ public void OnError(Action<IConsumer<byte[], byte[]>, Error> handler) =>
114114
public void OnStatistics(Action<IConsumer<byte[], byte[]>, string> handler) =>
115115
_statisticsHandlers.Add(handler);
116116

117-
public Offset GetPosition(TopicPartition topicPartition) =>
117+
public Offset GetPosition(Confluent.Kafka.TopicPartition topicPartition) =>
118118
_consumer.Position(topicPartition);
119119

120-
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) =>
120+
public WatermarkOffsets GetWatermarkOffsets(Confluent.Kafka.TopicPartition topicPartition) =>
121121
_consumer.GetWatermarkOffsets(topicPartition);
122122

123-
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) =>
123+
public WatermarkOffsets QueryWatermarkOffsets(Confluent.Kafka.TopicPartition topicPartition, TimeSpan timeout) =>
124124
_consumer.QueryWatermarkOffsets(topicPartition, timeout);
125125

126126
public List<Confluent.Kafka.TopicPartitionOffset> OffsetsForTimes(
@@ -286,7 +286,7 @@ private void ManualAssign(IEnumerable<TopicPartitions> topics)
286286
var partitions = topics
287287
.SelectMany(
288288
topic => topic.Partitions.Select(
289-
partition => new TopicPartition(topic.Name, new Partition(partition))))
289+
partition => new Confluent.Kafka.TopicPartition(topic.Name, new Partition(partition))))
290290
.ToList();
291291

292292
_consumer.Assign(partitions);
@@ -295,7 +295,7 @@ private void ManualAssign(IEnumerable<TopicPartitions> topics)
295295

296296
private void FirePartitionsAssignedHandlers(
297297
IConsumer<byte[], byte[]> consumer,
298-
List<TopicPartition> partitions)
298+
List<Confluent.Kafka.TopicPartition> partitions)
299299
{
300300
if (_stopTheWorldStrategy)
301301
{
@@ -323,7 +323,7 @@ private void FirePartitionRevokedHandlers(IConsumer<byte[], byte[]> consumer, Li
323323
if (_stopTheWorldStrategy)
324324
{
325325
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
326-
this.Assignment = new List<TopicPartition>();
326+
this.Assignment = new List<Confluent.Kafka.TopicPartition>();
327327
this.Subscription = new List<string>();
328328
_currentPartitionsOffsets.Clear();
329329
_flowManager.Stop();

src/KafkaFlow/Consumers/ConsumerContext.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
24
using System.Threading;
35
using System.Threading.Tasks;
46
using Confluent.Kafka;
7+
using KafkaFlow.Extensions;
58

69
namespace KafkaFlow.Consumers;
710

@@ -76,11 +79,23 @@ public void Complete()
7679
public IOffsetsWatermark GetOffsetsWatermark() =>
7780
new OffsetsWatermark(
7881
_consumer.GetWatermarkOffsets(
79-
new TopicPartition(
82+
new Confluent.Kafka.TopicPartition(
8083
this.TopicPartitionOffset.Topic,
8184
this.TopicPartitionOffset.Partition)));
8285

8386
public void Pause() => _consumer.FlowManager.Pause(_consumer.Assignment);
8487

8588
public void Resume() => _consumer.FlowManager.Resume(_consumer.Assignment);
89+
90+
public void Pause(IReadOnlyList<TopicPartition> topicPartitions)
91+
{
92+
var affectedPartitions = topicPartitions.Select(x => new Confluent.Kafka.TopicPartition(x.Topic, x.Partition));
93+
_consumer.FlowManager.Pause(_consumer.Assignment.Intersect(affectedPartitions).ToList());
94+
}
95+
96+
public void Resume(IReadOnlyList<TopicPartition> topicPartitions)
97+
{
98+
var affectedPartitions = topicPartitions.Select(x => new Confluent.Kafka.TopicPartition(x.Topic, x.Partition));
99+
_consumer.FlowManager.Resume(_consumer.Assignment.Intersect(affectedPartitions).ToList());
100+
}
86101
}

src/KafkaFlow/Consumers/ConsumerFlowManager.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ internal class ConsumerFlowManager : IConsumerFlowManager
1111
{
1212
private readonly IConsumer _consumer;
1313
private readonly ILogHandler _logHandler;
14-
private readonly List<TopicPartition> _pausedPartitions = new();
14+
private readonly List<Confluent.Kafka.TopicPartition> _pausedPartitions = new();
1515
private readonly SemaphoreSlim _consumerSemaphore = new(1, 1);
1616

1717
private IConsumer<byte[], byte[]> _clientConsumer;
@@ -26,9 +26,9 @@ public ConsumerFlowManager(
2626
_logHandler = logHandler;
2727
}
2828

29-
public IReadOnlyList<TopicPartition> PausedPartitions => _pausedPartitions.AsReadOnly();
29+
public IReadOnlyList<Confluent.Kafka.TopicPartition> PausedPartitions => _pausedPartitions.AsReadOnly();
3030

31-
public void Pause(IReadOnlyCollection<TopicPartition> topicPartitions)
31+
public void Pause(IReadOnlyCollection<Confluent.Kafka.TopicPartition> topicPartitions)
3232
{
3333
lock (_pausedPartitions)
3434
{
@@ -64,7 +64,7 @@ public void ReleaseHeartbeat()
6464
}
6565
}
6666

67-
public void Resume(IReadOnlyCollection<TopicPartition> topicPartitions)
67+
public void Resume(IReadOnlyCollection<Confluent.Kafka.TopicPartition> topicPartitions)
6868
{
6969
lock (_pausedPartitions)
7070
{

src/KafkaFlow/Consumers/ConsumerManager.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Linq;
44
using System.Threading;
55
using System.Threading.Tasks;
6-
using Confluent.Kafka;
76
using KafkaFlow.Configuration;
87
using KafkaFlow.Extensions;
98

@@ -108,7 +107,7 @@ private void OnPartitionRevoked(IEnumerable<Confluent.Kafka.TopicPartitionOffset
108107
_logHandler.Warning(
109108
"Partitions revoked",
110109
this.GetConsumerLogInfo(topicPartitions?.Select(x => x.TopicPartition).ToArray()
111-
?? Array.Empty<TopicPartition>()));
110+
?? Array.Empty<Confluent.Kafka.TopicPartition>()));
112111

113112
this.WorkerPool.StopAsync().GetAwaiter().GetResult();
114113

src/KafkaFlow/Consumers/ConsumerWorkerPool.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public ConsumerWorkerPool(
5555

5656
public IEvent WorkerPoolStopped => _workerPoolStoppedSubject;
5757

58-
public async Task StartAsync(IReadOnlyCollection<TopicPartition> partitions, int workersCount)
58+
public async Task StartAsync(IReadOnlyCollection<Confluent.Kafka.TopicPartition> partitions, int workersCount)
5959
{
6060
try
6161
{
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Collections.Generic;
2-
using Confluent.Kafka;
32

43
namespace KafkaFlow.Consumers;
54

@@ -11,17 +10,17 @@ public interface IConsumerFlowManager
1110
/// <summary>
1211
/// Gets a list of the consumer paused partitions
1312
/// </summary>
14-
IReadOnlyList<TopicPartition> PausedPartitions { get; }
13+
IReadOnlyList<Confluent.Kafka.TopicPartition> PausedPartitions { get; }
1514

1615
/// <summary>
1716
/// Pauses a set of partitions
1817
/// </summary>
1918
/// <param name="topicPartitions">A list of partitions</param>
20-
void Pause(IReadOnlyCollection<TopicPartition> topicPartitions);
19+
void Pause(IReadOnlyCollection<Confluent.Kafka.TopicPartition> topicPartitions);
2120

2221
/// <summary>
2322
/// Resumes a set of partitions
2423
/// </summary>
2524
/// <param name="topicPartitions">A list of partitions</param>
26-
void Resume(IReadOnlyCollection<TopicPartition> topicPartitions);
25+
void Resume(IReadOnlyCollection<Confluent.Kafka.TopicPartition> topicPartitions);
2726
}

0 commit comments

Comments
 (0)