* Dafür neues Interface `PollIntervalAwareRebalanceListener` eingeführt.
* `WordcountRebalanceListener` implementiert das neue Interface und
kümmert sich um alle Kafka-Belange.
* `WordcountRecordHandler` kümmert sich nur noch um die Fachlogik.
public class ApplicationConfiguration
{
@Bean
- public WordcountRecordHandler wordcountRecordHandler(
- PartitionStatisticsRepository repository,
- Consumer<String, String> consumer,
- ApplicationProperties properties)
+ public WordcountRecordHandler wordcountRecordHandler()
{
- return new WordcountRecordHandler(
- repository,
- properties.getTopic(),
- Clock.systemDefaultZone(),
- properties.getCommitInterval(),
- consumer);
+ return new WordcountRecordHandler();
}
@Bean
wordcountRecordHandler,
repository,
properties.getClientId(),
+ properties.getTopic(),
+ Clock.systemDefaultZone(),
+ properties.getCommitInterval(),
consumer);
}
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final ConsumerRebalanceListener rebalanceListener;
+ private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
private final RecordHandler<K, V> handler;
private final Lock lock = new ReentrantLock();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), rebalanceListener);
+ consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
while (true)
{
consumed++;
}
- handler.beforeNextPoll();
+ pollIntervalAwareRebalanceListener.beforeNextPoll();
}
}
catch(WakeupException e)
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+ default void beforeNextPoll() {}
+}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Collection;
import java.util.Map;
@RequiredArgsConstructor
@Slf4j
-public class WordcountRebalanceListener implements ConsumerRebalanceListener
+public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
{
private final WordcountRecordHandler handler;
private final PartitionStatisticsRepository repository;
private final String id;
+ private final String topic;
+ private final Clock clock;
+ private final Duration commitInterval;
private final Consumer<String, String> consumer;
+ private Instant lastCommit = Instant.EPOCH;
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
});
}
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ handler.getSeen().forEach((partiton, statistics) -> repository.save(
+ new StatisticsDocument(
+ partiton,
+ statistics,
+ consumer.position(new TopicPartition(topic, partiton)))));
+ lastCommit = clock.instant();
+ }
+ }
}
package de.juplo.kafka;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
-@RequiredArgsConstructor
@Slf4j
public class WordcountRecordHandler implements RecordHandler<String, String>
{
final static Pattern PATTERN = Pattern.compile("\\W+");
- private final PartitionStatisticsRepository repository;
- private final String topic;
- private final Clock clock;
- private final Duration commitInterval;
- private final Consumer<String, String> consumer;
-
private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
- private Instant lastCommit = Instant.EPOCH;
-
@Override
public void accept(ConsumerRecord<String, String> record)
}
}
-
- @Override
- public void beforeNextPoll()
- {
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- seen.forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
- partiton,
- statistics,
- consumer.position(new TopicPartition(topic, partiton)))));
- lastCommit = clock.instant();
- }
- }
-
public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
{
seen.put(partition, statistics);