From: Kai Moritz Date: Fri, 12 Aug 2022 15:40:11 +0000 (+0200) Subject: Verbesserungen aus 'deserialization' nach 'stored-offsets' gemerged X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1^2~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2da45caa1f9d32e3a5506d71cce7f06fa2e36523;p=demos%2Fkafka%2Ftraining Verbesserungen aus 'deserialization' nach 'stored-offsets' gemerged --- 2da45caa1f9d32e3a5506d71cce7f06fa2e36523 diff --cc src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 8e2e867,766740b..3925fcb --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -71,11 -55,10 +71,12 @@@ public class ApplicationConfiguratio Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); + props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index a632a89,3bac537..fc5d4c9 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -26,7 -24,7 +26,6 @@@ import java.util.* import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; - import java.util.function.Function; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@@ -63,8 -58,8 +62,10 @@@ class ApplicationTest @Autowired KafkaConsumer kafkaConsumer; @Autowired + KafkaConsumer offsetConsumer; + @Autowired + PartitionStatisticsRepository partitionStatisticsRepository; + @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @@@ -178,15 -177,27 +188,33 @@@ /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { - // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); ++ Integer partition = tp.partition(); ++ StatisticsDocument document = ++ partitionStatisticsRepository ++ .findById(partition.toString()) ++ .orElse(new StatisticsDocument(partition)); ++ document.offset = offset; ++ partitionStatisticsRepository.save(document); + }); - // The new positions must be commited! - offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); - } + partitions().forEach(tp -> + { + String partition = Integer.toString(tp.partition()); + Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); + consumer.accept(tp, offset.orElse(0l)); + }); + } List partitions() { @@@ -243,6 -253,10 +270,8 @@@ @BeforeEach public void init() { - testHandler = record -> {} ; - + seekToEnd(); + oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); @@@ -312,5 -323,18 +341,19 @@@ return new KafkaProducer<>(props); } + + @Bean + KafkaConsumer offsetConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); ++ props.put("enable.auto.commit", false); ++ props.put("auto.offset.reset", "latest"); + props.put("key.deserializer", BytesDeserializer.class.getName()); + props.put("value.deserializer", BytesDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } } }