From: Kai Moritz Date: Sun, 24 Jul 2022 09:10:45 +0000 (+0200) Subject: WIP X-Git-Tag: wip-DEPRECATED~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=858cf4734c21b1f9de57fbbe10ef1513c86df2a6;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/pom.xml b/pom.xml index 701704d..fe06959 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,10 @@ de.juplo.kafka - endless-consumer + wordcount 1.0-SNAPSHOT - Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic + Wordcount + Splits the incomming sentences into words and counts the words per user. diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76c2520..1d69370 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit; public class Application implements ApplicationRunner { @Autowired - EndlessConsumer endlessConsumer; + Wordcount wordcount; @Autowired ExecutorService executor; @@ -26,7 +26,7 @@ public class Application implements ApplicationRunner public void run(ApplicationArguments args) throws Exception { log.info("Starting EndlessConsumer"); - endlessConsumer.start(); + wordcount.start(); } @PreDestroy @@ -35,7 +35,7 @@ public class Application implements ApplicationRunner try { log.info("Stopping EndlessConsumer"); - endlessConsumer.stop(); + wordcount.stop(); } catch (IllegalStateException e) { diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c3955..a4d6e57 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -28,7 +28,7 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( + public Wordcount endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, Consumer> handler, @@ -36,7 +36,7 @@ public class ApplicationConfiguration ApplicationProperties properties) { return - new EndlessConsumer<>( + new Wordcount<>( executor, repository, properties.getClientId(), diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..afb0413 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final Wordcount wordcount; @Override @@ -18,7 +18,7 @@ public class ApplicationHealthIndicator implements HealthIndicator { try { - return consumer + return wordcount .exitStatus() .map(Health::down) .orElse(Health.outOfService()) diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index ed38080..998ba14 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -16,7 +16,7 @@ import java.util.concurrent.ExecutionException; @RequiredArgsConstructor public class DriverController { - private final EndlessConsumer consumer; + private final Wordcount consumer; @PostMapping("start") diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java deleted file mode 100644 index f9a9629..0000000 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ /dev/null @@ -1,293 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; - -import javax.annotation.PreDestroy; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -@Slf4j -@RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable -{ - private final ExecutorService executor; - private final PartitionStatisticsRepository repository; - private final String id; - private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; - private final java.util.function.Consumer> handler; - - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private boolean running = false; - private Exception exception; - private long consumed = 0; - - private final Map> seen = new HashMap<>(); - - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } - seen.put(partition, document.statistics); - }); - } - - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); - - Instant lastCommit = clock.instant(); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - handler.accept(record); - - consumed++; - - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } - - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - seen.forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - shutdown(); - } - catch(RecordDeserializationException e) - { - TopicPartition tp = e.topicPartition(); - long offset = e.offset(); - log.error( - "{} - Could not deserialize message on topic {} with offset={}: {}", - id, - tp, - offset, - e.getCause().toString()); - - shutdown(e); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); - } - finally - { - log.info("{} - Consumer-Thread exiting", id); - } - } - - private void shutdown() - { - shutdown(null); - } - - private void shutdown(Exception e) - { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } - } - - public Map> getSeen() - { - return seen; - } - - public void start() - { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } - } - - public synchronized void stop() throws InterruptedException - { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - - public boolean running() - { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } - } - - public Optional exitStatus() - { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/Wordcount.java b/src/main/java/de/juplo/kafka/Wordcount.java new file mode 100644 index 0000000..fe9958a --- /dev/null +++ b/src/main/java/de/juplo/kafka/Wordcount.java @@ -0,0 +1,293 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.WakeupException; + +import javax.annotation.PreDestroy; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +@Slf4j +@RequiredArgsConstructor +public class Wordcount implements ConsumerRebalanceListener, Runnable +{ + private final ExecutorService executor; + private final PartitionStatisticsRepository repository; + private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; + private final Consumer consumer; + private final java.util.function.Consumer> handler; + + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private boolean running = false; + private Exception exception; + private long consumed = 0; + + private final Map> seen = new HashMap<>(); + + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + newOffset); + Map removed = seen.remove(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } + seen.put(partition, document.statistics); + }); + } + + + @Override + public void run() + { + try + { + log.info("{} - Subscribing to topic {}", id, topic); + consumer.subscribe(Arrays.asList(topic), this); + + Instant lastCommit = clock.instant(); + + while (true) + { + ConsumerRecords records = + consumer.poll(Duration.ofSeconds(1)); + + // Do something with the data... + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); + + handler.accept(record); + + consumed++; + + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key().toString(); + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); + } + + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + seen.forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } + } + catch(WakeupException e) + { + log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); + shutdown(); + } + catch(RecordDeserializationException e) + { + TopicPartition tp = e.topicPartition(); + long offset = e.offset(); + log.error( + "{} - Could not deserialize message on topic {} with offset={}: {}", + id, + tp, + offset, + e.getCause().toString()); + + shutdown(e); + } + catch(Exception e) + { + log.error("{} - Unexpected error: {}", id, e.toString(), e); + shutdown(e); + } + finally + { + log.info("{} - Consumer-Thread exiting", id); + } + } + + private void shutdown() + { + shutdown(null); + } + + private void shutdown(Exception e) + { + lock.lock(); + try + { + try + { + log.info("{} - Unsubscribing from topic {}", id, topic); + consumer.unsubscribe(); + } + catch (Exception ue) + { + log.error( + "{} - Error while unsubscribing from topic {}: {}", + id, + topic, + ue.toString()); + } + finally + { + running = false; + exception = e; + condition.signal(); + } + } + finally + { + lock.unlock(); + } + } + + public Map> getSeen() + { + return seen; + } + + public void start() + { + lock.lock(); + try + { + if (running) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); + + log.info("{} - Starting - consumed {} messages before", id, consumed); + running = true; + exception = null; + executor.submit(this); + } + finally + { + lock.unlock(); + } + } + + public synchronized void stop() throws InterruptedException + { + lock.lock(); + try + { + if (!running) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + consumer.wakeup(); + condition.await(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + finally + { + lock.unlock(); + } + } + + @PreDestroy + public void destroy() throws ExecutionException, InterruptedException + { + log.info("{} - Destroy!", id); + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + } + + public boolean running() + { + lock.lock(); + try + { + return running; + } + finally + { + lock.unlock(); + } + } + + public Optional exitStatus() + { + lock.lock(); + try + { + if (running) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return Optional.ofNullable(exception); + } + finally + { + lock.unlock(); + } + } +}