From: Kai Moritz Date: Sat, 3 Jun 2023 09:50:05 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=15e69a5e30ccf01ba09d03fa71113ee39f956b0f;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 033d0cc..ef3db82 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -9,9 +9,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.time.Clock; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.IntStream; @Configuration @@ -32,17 +35,23 @@ public class ApplicationConfiguration @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, + KafkaProducer kafkaProducer, ExecutorService executor, ApplicationRecordHandler recordHandler, - ApplicationProperties properties) + ApplicationProperties properties, + Clock clock) { return new EndlessConsumer<>( executor, properties.getClientId(), properties.getTopicIn(), + properties.getPartitions(), kafkaConsumer, - recordHandler); + kafkaProducer, + recordHandler, + clock, + properties.getCommitInterval().toMillis()); } @Bean @@ -61,7 +70,7 @@ public class ApplicationConfiguration props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); + props.put("enable.auto.commit", false); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", IntegerDeserializer.class.getName()); @@ -90,4 +99,10 @@ public class ApplicationConfiguration return new KafkaProducer<>(props); } + + @Bean + public Clock clock() + { + return Clock.systemDefaultZone(); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index ccddc81..c7b0b8e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -30,6 +30,9 @@ public class ApplicationProperties private String topicIn; @NotNull @NotEmpty + private Integer partitions; + @NotNull + @NotEmpty private String autoOffsetReset; @NotNull private Duration commitInterval; diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 63a2f93..3e98842 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -3,11 +3,13 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import javax.annotation.PreDestroy; +import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; @@ -24,14 +26,19 @@ public class EndlessConsumer implements Runnable private final ExecutorService executor; private final String id; private final String topic; + private final int partitions; private final Consumer consumer; + private final Producer producer; private final RecordHandler handler; + private final Clock clock; + private final long autoCommitIntervalMs; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private boolean running = false; private Exception exception; private long consumed = 0; + private long lastCommit = 0; @@ -40,8 +47,10 @@ public class EndlessConsumer implements Runnable { try { + OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions); + log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), offsetRecorder); while (true) { @@ -62,8 +71,16 @@ public class EndlessConsumer implements Runnable record.value() ); + offsetRecorder.recordOffset(record.partition(), record.offset()); handler.accept(record); + long now = clock.millis(); + if (now - lastCommit >= autoCommitIntervalMs) + { + producer.sendOffsetsToTransaction( + offsetRecorder.getOffsets(), + consumer.groupMetadata()); + } consumed++; } } @@ -99,6 +116,93 @@ public class EndlessConsumer implements Runnable } } + class OffsetRecorder implements ConsumerRebalanceListener + { + private final String topic; + private final boolean[] activePartitions; + private final long[] offsets; + + + OffsetRecorder(String topic, int partitions) + { + this.topic = topic; + activePartitions = new boolean[partitions]; + offsets = new long[partitions]; + for (int i=0; i< partitions; i++) + { + offsets[i] = -1; + activePartitions[i] = false; + } + } + + + void recordOffset(int partition, long offset) + { + if (!activePartitions[partition]) + throw new IllegalStateException("Partition " + partition + " is not active!"); + + offsets[partition] = offset; + } + + Map getOffsets() + { + Map recordedOffsets = new HashMap<>(); + + for (int i=0; i -1) + recordedOffsets.put( + new TopicPartition(topic, i), + new OffsetAndMetadata(offsets[i])); + } + + return recordedOffsets; + } + + @Override + public void onPartitionsAssigned(Collection assignedPartitions) + { + assignedPartitions + .stream() + .forEach(topicPartition -> + { + log.info("Activating partition {}", topicPartition); + activePartitions[topicPartition.partition()] = true; + offsets[topicPartition.partition()] = -1; + }); + } + + @Override + public void onPartitionsRevoked(Collection revokedPartitions) + { + producer.sendOffsetsToTransaction( + revokedPartitions + .stream() + .collect( + () -> new HashMap(), + (map, topicPartition) -> + { + log.info("Commiting & deactivating revoked partition {}", topicPartition); + activePartitions[topicPartition.partition()] = false; + map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()])); + }, + (mapA, mapB) -> mapA.putAll(mapB)), + consumer.groupMetadata()); + } + + @Override + public void onPartitionsLost(Collection lostPartitions) + { + lostPartitions + .stream() + .forEach((topicPartition) -> + { + log.info("Deactivating lost partition {}", topicPartition); + activePartitions[topicPartition.partition()] = false; + }); + } + } + private void shutdown() { shutdown(null);