From: Kai Moritz Date: Sat, 13 Aug 2022 08:45:03 +0000 (+0200) Subject: Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=cfbe7dcd3318ee846cb5890eea4328e36c5aa364;hp=83a4bf324f5a7ec6010a7921118ec7d6e8f997cf;p=demos%2Fkafka%2Ftraining Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged --- diff --git a/docker-compose.yml b/docker-compose.yml index 30ae3b4..ee78746 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,6 +40,8 @@ services: ME_CONFIG_MONGODB_ADMINUSERNAME: juplo ME_CONFIG_MONGODB_ADMINPASSWORD: training ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + depends_on: + - mongo kafka-ui: image: provectuslabs/kafka-ui:0.3.3 @@ -54,7 +56,7 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/endless-long-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index d280aa6..76c2520 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -30,8 +30,22 @@ public class Application implements ApplicationRunner } @PreDestroy - public void stopExecutor() + public void shutdown() { + try + { + log.info("Stopping EndlessConsumer"); + endlessConsumer.stop(); + } + catch (IllegalStateException e) + { + log.info("Was already stopped: {}", e.toString()); + } + catch (Exception e) + { + log.error("Unexpected exception while stopping EndlessConsumer: {}", e); + } + try { log.info("Shutting down the ExecutorService."); @@ -41,7 +55,7 @@ public class Application implements ApplicationRunner } catch (InterruptedException e) { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString()); + log.error("Exception while waiting for the termination of the ExecutorService: {}", e); } finally { diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1ba9d5b..1ea90a2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -8,10 +7,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Clock; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; @Configuration @@ -19,30 +18,41 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public KeyCountingRecordHandler keyCountingRecordHandler() { - return (record) -> - { - // Handle record - }; + return new KeyCountingRecordHandler(); + } + + @Bean + public KeyCountingRebalanceListener keyCountingRebalanceListener( + KeyCountingRecordHandler keyCountingRecordHandler, + PartitionStatisticsRepository repository, + ApplicationProperties properties) + { + return new KeyCountingRebalanceListener( + keyCountingRecordHandler, + repository, + properties.getClientId(), + Clock.systemDefaultZone(), + properties.getCommitInterval()); } @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, - PartitionStatisticsRepository repository, + KeyCountingRebalanceListener keyCountingRebalanceListener, + KeyCountingRecordHandler keyCountingRecordHandler, ApplicationProperties properties) { return new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), kafkaConsumer, - handler); + keyCountingRebalanceListener, + keyCountingRecordHandler); } @Bean @@ -57,9 +67,11 @@ public class ApplicationConfiguration Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index fa731c5..14e928f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,7 @@ import org.springframework.validation.annotation.Validated; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; +import java.time.Duration; @ConfigurationProperties(prefix = "consumer") @@ -30,4 +31,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String autoOffsetReset; + @NotNull + private Duration commitInterval; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index ed38080..f6ff47f 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -2,11 +2,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -17,6 +13,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; + private final KeyCountingRecordHandler keyCountingRecordHandler; @PostMapping("start") @@ -35,7 +32,7 @@ public class DriverController @GetMapping("seen") public Map> seen() { - return consumer.getSeen(); + return keyCountingRecordHandler.getSeen(); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index a21dd86..047d5cb 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -19,14 +19,14 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable +public class EndlessConsumer implements Runnable { private final ExecutorService executor; - private final PartitionStatisticsRepository repository; private final String id; private final String topic; private final Consumer consumer; - private final java.util.function.Consumer> handler; + private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; + private final RecordHandler handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -34,56 +34,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private Exception exception; private long consumed = 0; - private final Map> seen = new HashMap<>(); - private final Map offsets = new HashMap<>(); - - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - Long oldOffset = offsets.remove(partition); - log.info( - "{} - removing partition: {}, consumed {} records (offset {} -> {})", - id, - partition, - newOffset - oldOffset, - oldOffset, - newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - repository.save(new StatisticsDocument(partition, removed)); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - offsets.put(partition, offset); - seen.put( - partition, - repository - .findById(Integer.toString(tp.partition())) - .map(document -> document.statistics) - .orElse(new HashMap<>())); - }); - } @Override @@ -92,7 +42,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); + consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); while (true) { @@ -116,18 +66,9 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl handler.accept(record); consumed++; - - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); } + + pollIntervalAwareRebalanceListener.beforeNextPoll(); } } catch(WakeupException e) @@ -197,11 +138,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl } } - public Map> getSeen() - { - return seen; - } - public void start() { lock.lock(); @@ -221,7 +157,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl } } - public synchronized void stop() throws ExecutionException, InterruptedException + public synchronized void stop() throws InterruptedException { lock.lock(); try @@ -244,22 +180,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl public void destroy() throws ExecutionException, InterruptedException { log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - catch (Exception e) - { - log.error("{} - Unexpected exception while trying to stop the consumer", id, e); - } - finally - { - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } public boolean running() diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java new file mode 100644 index 0000000..636ff86 --- /dev/null +++ b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java @@ -0,0 +1,76 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Map; + + +@RequiredArgsConstructor +@Slf4j +public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +{ + private final KeyCountingRecordHandler handler; + private final PartitionStatisticsRepository repository; + private final String id; + private final Clock clock; + private final Duration commitInterval; + + private Instant lastCommit = Instant.EPOCH; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + log.info("{} - adding partition: {}", id, partition); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); + handler.addPartition(partition, document.statistics); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + log.info("{} - removing partition: {}", id, partition); + Map removed = handler.removePartition(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + repository.save(new StatisticsDocument(partition, removed)); + }); + } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data, last commit: {}", lastCommit); + handler.getSeen().forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics))); + lastCommit = clock.instant(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java new file mode 100644 index 0000000..099dcf7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java @@ -0,0 +1,46 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashMap; +import java.util.Map; + + +@Slf4j +public class KeyCountingRecordHandler implements RecordHandler +{ + private final Map> seen = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key().toString(); + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); + } + + public void addPartition(Integer partition, Map statistics) + { + seen.put(partition, statistics); + } + + public Map removePartition(Integer partition) + { + return seen.remove(partition); + } + + + public Map> getSeen() + { + return seen; + } +} diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java new file mode 100644 index 0000000..8abec12 --- /dev/null +++ b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + + +public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener +{ + default void beforeNextPoll() {} +} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java new file mode 100644 index 0000000..3c9dd15 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.function.Consumer; + + +public interface RecordHandler extends Consumer> +{ + default void beforeNextPoll() {} +} diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 2416253..415ef5c 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -20,6 +20,12 @@ public class StatisticsDocument { } + public StatisticsDocument(Integer partition) + { + this.id = Integer.toString(partition); + this.statistics = new HashMap<>(); + } + public StatisticsDocument(Integer partition, Map statistics) { this.id = Integer.toString(partition); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 93b27c2..fc1c68a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ consumer: client-id: DEV topic: test auto-offset-reset: earliest + commit-interval: 5s management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java new file mode 100644 index 0000000..d1d8e50 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -0,0 +1,43 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import static de.juplo.kafka.ApplicationTests.TOPIC; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "consumer.topic=" + TOPIC, + "spring.mongodb.embedded.version=4.4.13" }) +@EmbeddedKafka(topics = TOPIC) +@AutoConfigureDataMongo +public class ApplicationIT +{ + public static final String TOPIC = "FOO"; + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + + + @Test + public void testApplicationStartup() + { + restTemplate.getForObject( + "http://localhost:" + port + "/actuator/health", + String.class + ) + .contains("UP"); + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index caa25c5..7f666f6 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -26,8 +26,6 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -43,6 +41,7 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, + "consumer.commit-interval=1s", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -69,9 +68,10 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository repository; + KeyCountingRebalanceListener keyCountingRebalanceListener; + @Autowired + KeyCountingRecordHandler keyCountingRecordHandler; - Consumer> testHandler; EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@ -81,17 +81,22 @@ class ApplicationTests /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { - send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); + send100Messages((partition, key, counter) -> + { + Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@ -104,16 +109,19 @@ class ApplicationTests } @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - send100Messages(counter -> - counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter))); + send100Messages((partition, key, counter) -> + { + Bytes value = counter == 77 + ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) + : new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -122,6 +130,7 @@ class ApplicationTests endlessConsumer.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -159,8 +168,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -175,6 +184,21 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { + // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + }); + // The new positions must be commited! + offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { offsetConsumer.assign(partitions()); @@ -192,7 +216,12 @@ class ApplicationTests } - void send100Messages(Function messageGenerator) + public interface RecordGenerator + { + public ProducerRecord generate(int partition, String key, long counter); + } + + void send100Messages(RecordGenerator recordGenerator) { long i = 0; @@ -200,14 +229,8 @@ class ApplicationTests { for (int key = 0; key < 10; key++) { - Bytes value = messageGenerator.apply(++i); - ProducerRecord record = - new ProducerRecord<>( - TOPIC, - partition, - Integer.toString(key%2), - value); + recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); kafkaProducer.send(record, (metadata, e) -> { @@ -237,7 +260,7 @@ class ApplicationTests @BeforeEach public void init() { - testHandler = record -> {} ; + seekToEnd(); oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); @@ -249,23 +272,25 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> - { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - testHandler.accept(record); + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(keyCountingRecordHandler) { + @Override + public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), kafkaConsumer, + keyCountingRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java new file mode 100644 index 0000000..de28385 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -0,0 +1,28 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRecord; + + +@RequiredArgsConstructor +public abstract class TestRecordHandler implements RecordHandler +{ + private final RecordHandler handler; + + + public abstract void onNewRecord(ConsumerRecord record); + + + @Override + public void accept(ConsumerRecord record) + { + this.onNewRecord(record); + handler.accept(record); + } + @Override + + public void beforeNextPoll() + { + handler.beforeNextPoll(); + } +}