consumer);
}
+ @Bean
+ public WordcountRebalanceListener wordcountRebalanceListener(
+ WordcountRecordHandler wordcountRecordHandler)
+ {
+ return new WordcountRebalanceListener(wordcountRecordHandler);
+ }
+
@Bean
public EndlessConsumer<String, String> endlessConsumer(
KafkaConsumer<String, String> kafkaConsumer,
ExecutorService executor,
+ WordcountRebalanceListener wordcountRebalanceListener,
WordcountRecordHandler wordcountRecordHandler,
ApplicationProperties properties)
{
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
+ wordcountRebalanceListener,
wordcountRecordHandler);
}
@Slf4j
@RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer<K, V> implements Runnable
{
private final ExecutorService executor;
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
+ private final ConsumerRebalanceListener rebalanceListener;
private final RecordHandler<K, V> handler;
private final Lock lock = new ReentrantLock();
private long consumed = 0;
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp -> handler.onPartitionRevoked(tp));
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp -> handler.onPartitionAssigned(tp));
- }
-
@Override
public void run()
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), this);
+ consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
{
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+
+
+@RequiredArgsConstructor
+public class WordcountRebalanceListener implements ConsumerRebalanceListener
+{
+ private final RecordHandler<String, String> handler;
+
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp -> handler.onPartitionAssigned(tp));
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp -> handler.onPartitionRevoked(tp));
+ }
+}
ExecutorService executor;
@Autowired
PartitionStatisticsRepository repository;
- @Autowired
+ @Autowired
+ WordcountRebalanceListener wordcountRebalanceListener;
+ @Autowired
WordcountRecordHandler wordcountRecordHandler;
EndlessConsumer<String, String> endlessConsumer;
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
+ wordcountRebalanceListener,
captureOffsetAndExecuteTestHandler);
endlessConsumer.start();