+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
-{
- private final AdderRecordHandler handler;
- private final PartitionStatisticsRepository repository;
- private final String id;
- private final String topic;
- private final Clock clock;
- private final Duration commitInterval;
- private final Consumer<String, String> consumer;
-
- private Instant lastCommit = Instant.EPOCH;
- private boolean commitsEnabled = true;
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- StateDocument document =
- repository
- .findById(Integer.toString(partition))
- .orElse(new StateDocument(partition));
- log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
- if (document.offset >= 0)
- {
- // Only seek, if a stored offset was found
- // Otherwise: Use initial offset, generated by Kafka
- consumer.seek(tp, document.offset);
- }
- handler.addPartition(partition, document.state);
- });
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long offset = consumer.position(tp);
- log.info(
- "{} - removing partition: {}, offset of next message {})",
- id,
- partition,
- offset);
- if (commitsEnabled)
- {
- repository.save(new StateDocument(partition, handler.removePartition(partition), offset));
- }
- else
- {
- log.info("Offset commits are disabled! Last commit: {}", lastCommit);
- }
- });
- }
-
-
- @Override
- public void beforeNextPoll()
- {
- if (!commitsEnabled)
- {
- log.info("Offset commits are disabled! Last commit: {}", lastCommit);
- return;
- }
-
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
- new StateDocument(
- partiton,
- sumBusinessLogic.getState(),
- consumer.position(new TopicPartition(topic, partiton)))));
- lastCommit = clock.instant();
- }
- }
-
- @Override
- public void enableCommits()
- {
- commitsEnabled = true;
- }
-
- @Override
- public void disableCommits()
- {
- commitsEnabled = false;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class AdderRecordHandler implements RecordHandler<String, String>
-{
- private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
-
-
- @Override
- public void accept(ConsumerRecord<String, String> record)
- {
- Integer partition = record.partition();
- String user = record.key();
- String message = record.value();
- switch (message)
- {
- case "START":
- state.get(partition).startSum(user);
- break;
-
- case "END":
- Long result = state.get(partition).endSum(user);
- log.info("New result for {}: {}", user, result);
- break;
-
- default:
- state.get(partition).addToSum(user, Integer.parseInt(message));
- break;
- }
- }
-
- protected void addPartition(Integer partition, Map<String, Long> state)
- {
- this.state.put(partition, new AdderBusinessLogic(state));
- }
-
- protected Map<String, Long> removePartition(Integer partition)
- {
- return this.state.remove(partition).getState();
- }
-
-
- public Map<Integer, AdderBusinessLogic> getState()
- {
- return state;
- }
-}
public class ApplicationConfiguration
{
@Bean
- public AdderRecordHandler sumRecordHandler()
+ public ApplicationRecordHandler recordHandler()
{
- return new AdderRecordHandler();
+ return new ApplicationRecordHandler();
}
@Bean
- public AdderRebalanceListener sumRebalanceListener(
- AdderRecordHandler adderRecordHandler,
- PartitionStatisticsRepository repository,
+ public ApplicationRebalanceListener rebalanceListener(
+ ApplicationRecordHandler recordHandler,
+ StateRepository stateRepository,
Consumer<String, String> consumer,
ApplicationProperties properties)
{
- return new AdderRebalanceListener(
- adderRecordHandler,
- repository,
+ return new ApplicationRebalanceListener(
+ recordHandler,
+ stateRepository,
properties.getClientId(),
properties.getTopic(),
Clock.systemDefaultZone(),
public EndlessConsumer<String, String> endlessConsumer(
KafkaConsumer<String, String> kafkaConsumer,
ExecutorService executor,
- AdderRebalanceListener adderRebalanceListener,
- AdderRecordHandler adderRecordHandler,
+ ApplicationRebalanceListener rebalanceListener,
+ ApplicationRecordHandler recordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- adderRebalanceListener,
- adderRecordHandler);
+ rebalanceListener,
+ recordHandler);
}
@Bean
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+ private final ApplicationRecordHandler recordHandler;
+ private final StateRepository stateRepository;
+ private final String id;
+ private final String topic;
+ private final Clock clock;
+ private final Duration commitInterval;
+ private final Consumer<String, String> consumer;
+
+ private Instant lastCommit = Instant.EPOCH;
+ private boolean commitsEnabled = true;
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ StateDocument document =
+ stateRepository
+ .findById(Integer.toString(partition))
+ .orElse(new StateDocument(partition));
+ log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
+ if (document.offset >= 0)
+ {
+ // Only seek, if a stored offset was found
+ // Otherwise: Use initial offset, generated by Kafka
+ consumer.seek(tp, document.offset);
+ }
+ recordHandler.addPartition(partition, document.state);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long offset = consumer.position(tp);
+ log.info(
+ "{} - removing partition: {}, offset of next message {})",
+ id,
+ partition,
+ offset);
+ if (commitsEnabled)
+ {
+ Map<String, Long> removed = recordHandler.removePartition(partition);
+ stateRepository.save(new StateDocument(partition, removed, offset));
+ }
+ else
+ {
+ log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+ }
+ });
+ }
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (!commitsEnabled)
+ {
+ log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+ return;
+ }
+
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+ new StateDocument(
+ partiton,
+ adder.getState(),
+ consumer.position(new TopicPartition(topic, partiton)))));
+ lastCommit = clock.instant();
+ }
+ }
+
+ @Override
+ public void enableCommits()
+ {
+ commitsEnabled = true;
+ }
+
+ @Override
+ public void disableCommits()
+ {
+ commitsEnabled = false;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, String>
+{
+ private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ Integer partition = record.partition();
+ String user = record.key();
+ String message = record.value();
+ switch (message)
+ {
+ case "START":
+ state.get(partition).startSum(user);
+ break;
+
+ case "END":
+ Long result = state.get(partition).endSum(user);
+ log.info("New result for {}: {}", user, result);
+ break;
+
+ default:
+ state.get(partition).addToSum(user, Integer.parseInt(message));
+ break;
+ }
+ }
+
+ protected void addPartition(Integer partition, Map<String, Long> state)
+ {
+ this.state.put(partition, new AdderBusinessLogic(state));
+ }
+
+ protected Map<String, Long> removePartition(Integer partition)
+ {
+ return this.state.remove(partition).getState();
+ }
+
+
+ public Map<Integer, AdderBusinessLogic> getState()
+ {
+ return state;
+ }
+}
public class DriverController
{
private final EndlessConsumer consumer;
- private final AdderRecordHandler adderRecordHandler;
+ private final ApplicationRecordHandler recordHandler;
@PostMapping("start")
public Map<Integer, Map<String, Long>> state()
{
return
- adderRecordHandler
+ recordHandler
.getState()
.entrySet()
.stream()
@GetMapping("state/{user}")
public ResponseEntity<Long> seen(@PathVariable String user)
{
- for (AdderBusinessLogic adderBusinessLogic : adderRecordHandler.getState().values())
+ for (AdderBusinessLogic adder : recordHandler.getState().values())
{
- Optional<Long> sum = adderBusinessLogic.getSum(user);
+ Optional<Long> sum = adder.getSum(user);
if (sum.isPresent())
return ResponseEntity.ok(sum.get());
}
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
- private final RecordHandler<K, V> handler;
+ private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+ private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- pollIntervalAwareRebalanceListener.enableCommits();
- consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+ rebalanceListener.enableCommits();
+ consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
{
record.value()
);
- handler.accept(record);
+ recordHandler.accept(record);
consumed++;
}
- pollIntervalAwareRebalanceListener.beforeNextPoll();
+ rebalanceListener.beforeNextPoll();
}
}
catch(WakeupException e)
catch(Exception e)
{
log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
- pollIntervalAwareRebalanceListener.disableCommits();
+ rebalanceListener.disableCommits();
shutdown(e);
}
finally
+++ /dev/null
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface PartitionStatisticsRepository extends MongoRepository<StateDocument, String>
-{
- public Optional<StateDocument> findById(String partition);
-}
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-@Document(collection = "statistics")
+@Document(collection = "state")
@ToString
public class StateDocument
{
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface StateRepository extends MongoRepository<StateDocument, String>
+{
+ public Optional<StateDocument> findById(String partition);
+}
@Autowired
ExecutorService executor;
@Autowired
- PartitionStatisticsRepository partitionStatisticsRepository;
+ StateRepository stateRepository;
@Autowired
PollIntervalAwareConsumerRebalanceListener rebalanceListener;
@Autowired
log.info("New position for {}: {}", tp, offset);
Integer partition = tp.partition();
StateDocument document =
- partitionStatisticsRepository
+ stateRepository
.findById(partition.toString())
.orElse(new StateDocument(partition));
document.offset = offset;
- partitionStatisticsRepository.save(document);
+ stateRepository.save(document);
});
offsetConsumer.unsubscribe();
}
partitions().forEach(tp ->
{
String partition = Integer.toString(tp.partition());
- Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+ Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
consumer.accept(tp, offset.orElse(0l));
});
}