From 3f11c28ac68af0b43feb59500e3aba9eeca957b8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 9 Nov 2024 17:21:56 +0100 Subject: [PATCH] =?utf8?q?R=C3=BCckbau=20auf=20einfachen=20Consumer=20mit?= =?utf8?q?=20Statistiken=20zur=20Nachrichtenz=C3=A4hlung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 10 +- docker/docker-compose.yml | 27 +- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 29 +- .../de/juplo/kafka/ApplicationProperties.java | 27 -- .../java/de/juplo/kafka/ExampleConsumer.java | 352 ++---------------- src/main/resources/application.yml | 12 - .../java/de/juplo/kafka/ApplicationTests.java | 10 +- 8 files changed, 40 insertions(+), 429 deletions(-) diff --git a/README.sh b/README.sh index 07f7de4..bdefd2b 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-log-compaction-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer-1 +docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2 if [[ $(docker image ls -q $IMAGE) == "" || @@ -28,8 +28,12 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer docker compose -f docker/docker-compose.yml up -d consumer-1 +sleep 10 +docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ +docker compose -f docker/docker-compose.yml up -d consumer-2 sleep 10 -docker compose -f docker/docker-compose.yml exec cli http consumer-1:8881/ +docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ +docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/ docker compose -f docker/docker-compose.yml stop producer consumer-1 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index db8abf6..d80fed4 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -137,17 +137,11 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test - kafka-topics --bootstrap-server kafka:9092 --describe --topic state else kafka-topics --bootstrap-server kafka:9092 \ --delete \ --if-exists \ --topic test - kafka-topics --bootstrap-server kafka:9092 \ - --delete \ - --if-exists \ - --topic state \ - kafka-topics --bootstrap-server kafka:9092 \ --create \ --topic test \ @@ -156,20 +150,7 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ - && \ - kafka-topics --bootstrap-server kafka:9092 \ - --create \ - --topic state \ - --partitions 2 \ - --replication-factor 3 \ - --config min.insync.replicas=2 \ - --config cleanup.policy=compact \ - --config segment.ms=3000 \ - --config max.compaction.lag.ms=5000 \ - && echo Das Topic \'state\' wurde erfolgreich angelegt: \ - && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \ - && \ - date > INITIALIZED + && date > INITIALIZED fi stop_grace_period: 0s depends_on: @@ -218,7 +199,7 @@ services: juplo.producer.throttle-ms: 10 consumer-1: - image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT + image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer-1 @@ -227,7 +208,7 @@ services: logging.level.de.juplo: TRACE consumer-2: - image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT + image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer-2 @@ -236,7 +217,7 @@ services: logging.level.de.juplo: TRACE consumer-3: - image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT + image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer-3 diff --git a/pom.xml b/pom.xml index 9136018..58da35e 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-log-compaction-SNAPSHOT + 1.1-rebalance-listener-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 49875a0..a4856a6 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,11 +2,8 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -22,7 +19,6 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - Producer kafkaProducer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -31,8 +27,6 @@ public class ApplicationConfiguration properties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, - properties.getProducerProperties().getTopic(), - kafkaProducer, () -> applicationContext.close()); } @@ -52,29 +46,10 @@ public class ApplicationConfiguration props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); } props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", RangeAssignor.class.getName()); + props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return new KafkaConsumer<>(props); } - - @Bean - public KafkaProducer kafkaProducer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("delivery.timeout.ms", 20000); // 20 Sekunden - props.put("request.timeout.ms", 10000); // 10 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLingerMs()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 0b43159..c8193c9 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -25,8 +25,6 @@ public class ApplicationProperties @NotNull private ConsumerProperties consumer; - @NotNull - private ProducerProperties producer; public ConsumerProperties getConsumerProperties() @@ -34,11 +32,6 @@ public class ApplicationProperties return consumer; } - public ProducerProperties getProducerProperties() - { - return producer; - } - @Validated @Getter @@ -56,24 +49,4 @@ public class ApplicationProperties enum OffsetReset { latest, earliest, none } } - - @Validated - @Getter - @Setter - static class ProducerProperties - { - @NotNull - @NotEmpty - private String topic; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Integer batchSize; - @NotNull - private Integer lingerMs; - @NotNull - @NotEmpty - private String compressionType; - } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index fa2ff81..53abd4d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,14 +5,11 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.*; -import java.util.concurrent.Phaser; @Slf4j @@ -24,19 +21,9 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Thread workerThread; private final Runnable closeCallback; - private final String stateTopic; - private final Producer producer; - private volatile boolean running = false; - private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); - private volatile State[] partitionStates; - private Map[] restoredState; private CounterState[] counterState; - private volatile long[] stateEndOffsets; - private volatile int[] seen; - private volatile int[] acked; - private volatile boolean[] done; private long consumed = 0; @@ -44,15 +31,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String clientId, String topic, Consumer consumer, - String stateTopic, - Producer producer, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; - this.stateTopic = stateTopic; - this.producer = producer; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -69,20 +52,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{} - Fetching PartitionInfo for topic {}", id, topic); int numPartitions = consumer.partitionsFor(topic).size(); log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); - partitionStates = new State[numPartitions]; - for (int i=0; i records = consumer.poll(Duration.ofSeconds(1)); - int phase = phaser.getPhase(); - - assignedPartitions - .forEach(partition -> - { - seen[partition.partition()] = 0; - acked[partition.partition()] = 0; - done[partition.partition()] = false; - }); - - log.info("{} - Received {} messages in phase {}", id, records.count(), phase); - records - .partitions() - .forEach(partition -> - { - for (ConsumerRecord record : records.records(partition)) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - - checkRestoreProgress(partition); - - done[partition.partition()] = true; - }); - - assignedPartitions - .forEach(partition -> - { - if (seen[partition.partition()] == 0) - { - int arrivedPhase = phaser.arrive(); - log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase); - } - }); - - int arrivedPhase = phaser.arriveAndAwaitAdvance(); - log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase); + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } } } catch(WakeupException e) @@ -163,59 +104,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); - if (topic.equals(this.topic)) - { - handleMessage(partition, key); - } - else - { - handleState(partition, key, value); - } - } - - private void checkRestoreProgress(TopicPartition topicPartition) - { - int partition = topicPartition.partition(); - - if (partitionStates[partition] == State.RESTORING) - { - long consumerPosition = consumer.position(topicPartition); - - if (consumerPosition + 1 >= stateEndOffsets[partition]) - { - log.info( - "{} - Position of consumer is {}. Restoring of state for partition {} done!", - id, - consumerPosition, - topicPartition); - stateAssigned(partition); - } - else - { - log.debug( - "{} - Restored state up to offset {}, end-offset: {}", - id, - consumerPosition, - stateEndOffsets[partition]); - } - } - } - - private synchronized void handleState( - int partition, - String key, - String value) - { - restoredState[partition].put(key, Long.parseLong(value)); - } - - private void handleMessage( - Integer partition, - String key) - { Long counter = computeCount(partition, key); log.info("{} - current value for counter {}: {}", id, key, counter); - sendCounterState(partition, key, counter); } private synchronized Long computeCount(int partition, String key) @@ -230,82 +120,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener return result; } - void sendCounterState(int partition, String key, Long counter) - { - seen[partition]++; - - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - stateTopic, // Topic - key, // Key - counter.toString() // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}", - id, - record.key(), - record.value(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - - acked[partition]++; - if (done[partition] && !(acked[partition] < seen[partition])) - { - int arrivedPhase = phaser.arrive(); - log.debug( - "{} - Arrived at phase {} for partition {}, seen={}, acked={}", - id, - arrivedPhase, - partition, - seen[partition], - acked[partition]); - } - else - { - log.debug( - "{} - Still in phase {} for partition {}, seen={}, acked={}", - id, - phaser.getPhase(), - partition, - seen[partition], - acked[partition]); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - record.key(), - record.value(), - now - time - ); - } @Override public void onPartitionsAssigned(Collection partitions) @@ -313,7 +127,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partitions .stream() .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> restoreAndAssign(partition.partition())); + .forEach(partition -> + { + assignedPartitions.add(partition); + counterState[partition.partition()] = new CounterState(new HashMap<>()); + }); } @Override @@ -322,132 +140,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener partitions .stream() .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> revoke(partition.partition())); - } - - private void restoreAndAssign(int partition) - { - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); - - long stateEndOffset = consumer - .endOffsets(List.of(statePartition)) - .get(statePartition) - .longValue(); - - long stateBeginningOffset = consumer - .beginningOffsets(List.of(statePartition)) - .get(statePartition); - - log.info( - "{} - Found beginning-offset {} and end-offset {} for state partition {}", - id, - stateBeginningOffset, - stateEndOffset, - partition); - - if (stateBeginningOffset < stateEndOffset) - { - stateRestoring(partition, stateBeginningOffset, stateEndOffset); - } - else - { - log.info("{} - No state available for partition {}", id, partition); - restoredState[partition] = new HashMap<>(); - stateAssigned(partition); - } - } - - private void revoke(int partition) - { - State partitionState = partitionStates[partition]; - switch (partitionState) - { - case RESTORING, ASSIGNED -> stateUnassigned(partition); - case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState); - } - } - - private void stateRestoring(int partition, long stateBeginningOffset, long stateEndOffset) - { - log.info( - "{} - Changing partition-state for {}: {} -> RESTORING", - id, - partition, - partitionStates[partition]); - partitionStates[partition] = State.RESTORING; - - TopicPartition messagePartition = new TopicPartition(this.topic, partition); - log.info("{} - Pausing message partition {}", id, messagePartition); - consumer.pause(List.of(messagePartition)); - - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); - log.info( - "{} - Seeking to first offset {} for state partition {}", - id, - stateBeginningOffset, - statePartition); - consumer.seek(statePartition, stateBeginningOffset); - stateEndOffsets[partition] = stateEndOffset; - restoredState[partition] = new HashMap<>(); - log.info("{} - Resuming state partition {}", id, statePartition); - consumer.resume(List.of(statePartition)); - } - - private void stateAssigned(int partition) - { - log.info( - "{} - State-change for partition {}: {} -> ASSIGNED", - id, - partition, - partitionStates[partition]); - - partitionStates[partition] = State.ASSIGNED; - - TopicPartition statePartition = new TopicPartition(stateTopic, partition); - log.info("{} - Pausing state partition {}...", id, statePartition); - consumer.pause(List.of(statePartition)); - counterState[partition] = new CounterState(restoredState[partition]); - restoredState[partition] = null; - - TopicPartition messagePartition = new TopicPartition(topic, partition); - log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition); - assignedPartitions.add(messagePartition); - phaser.register(); - log.info( - "{} - Registered new party for newly assigned partition {}. New total number of parties: {}", - id, - messagePartition, - phaser.getRegisteredParties()); - log.info("{} - Resuming message partition {}...", id, messagePartition); - consumer.resume(List.of(messagePartition)); - } - - private void stateUnassigned(int partition) - { - State oldPartitionState = partitionStates[partition]; - - log.info( - "{} - State-change for partition {}: {} -> UNASSIGNED", - id, - partition, - oldPartitionState); - - partitionStates[partition] = State.UNASSIGNED; - - if (oldPartitionState == State.ASSIGNED) - { - TopicPartition messagePartition = new TopicPartition(topic, partition); - log.info("{} - Revoking partition {}", id, messagePartition); - assignedPartitions.remove(messagePartition); - counterState[partition] = null; - - phaser.arriveAndDeregister(); - log.info( - "{} - Deregistered party for revoked partition {}. New total number of parties: {}", - id, - messagePartition, - phaser.getRegisteredParties()); - } + .forEach(partition -> + { + assignedPartitions.remove(partition); + counterState[partition.partition()] = null; + }); } @@ -458,11 +155,4 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.wakeup(); workerThread.join(); } - - enum State - { - UNASSIGNED, - RESTORING, - ASSIGNED - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d9e7066..7a06731 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,12 +6,6 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s - producer: - topic: state - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip management: endpoint: shutdown: @@ -34,12 +28,6 @@ info: topic: ${juplo.consumer.topic} auto-offset-reset: ${juplo.consumer.auto-offset-reset} auto-commit-interval: ${juplo.consumer.auto-commit-interval} - producer: - topic: ${juplo.producer.topic} - acks: ${juplo.producer.acks} - batch-size: ${juplo.producer.batch-size} - linger-ms: ${juplo.producer.linger-ms} - compression-type: ${juplo.producer.compression-type} logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 22bb613..e4b97a4 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -9,7 +9,8 @@ import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; -import static de.juplo.kafka.ApplicationTests.*; +import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.TOPIC; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; @@ -20,13 +21,12 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.consumer.topic=" + TOPIC_IN}) + "juplo.consumer.topic=" + TOPIC }) @AutoConfigureMockMvc -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests { - static final String TOPIC_IN = "FOO"; - static final String TOPIC_OUT = "BAR"; + static final String TOPIC = "FOO"; static final int PARTITIONS = 10; @Autowired -- 2.20.1