X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=6af3765d0532a59f821d031927d3cc38de2400bf;hb=d1ce252495c63a971d1734a35b10cf26313d95fb;hp=22dce95ee17a7c6c9f089bbe5e9377fe8bdcdba3;hpb=e87f4bb2bc188252955fb4932ddd99161ba621d3;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 22dce95..6af3765 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -1,20 +1,22 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import javax.annotation.PreDestroy; import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @@ -25,43 +27,96 @@ public class EndlessConsumer implements Runnable private final String groupId; private final String id; private final String topic; + private final String autoOffsetReset; - private AtomicBoolean running = new AtomicBoolean(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private boolean running = false; + private Exception exception; private long consumed = 0; private KafkaConsumer consumer = null; - private Future future = null; + + + private final Map> seen = new HashMap<>(); + private final Map offsets = new HashMap<>(); + public EndlessConsumer( ExecutorService executor, String bootstrapServer, String groupId, String clientId, - String topic) + String topic, + String autoOffsetReset) { this.executor = executor; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; this.topic = topic; + this.autoOffsetReset = autoOffsetReset; } @Override public void run() { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("group.id", groupId); - props.put("client.id", id); - props.put("auto.offset.reset", "earliest"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - this.consumer = new KafkaConsumer<>(props); - try { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServer); + props.put("group.id", groupId); + props.put("client.id", id); + props.put("auto.offset.reset", autoOffsetReset); + props.put("metadata.max.age.ms", "1000"); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + this.consumer = new KafkaConsumer<>(props); + log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() + { + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + Long oldOffset = offsets.remove(partition); + log.info( + "{} - removing partition: {}, consumed {} records (offset {} -> {})", + id, + partition, + newOffset - oldOffset, + oldOffset, + newOffset); + Map removed = seen.remove(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + offsets.put(partition, offset); + seen.put(partition, new HashMap<>()); + }); + } + }); while (true) { @@ -82,17 +137,29 @@ public class EndlessConsumer implements Runnable record.key(), record.value() ); + + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key(); + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } } } catch(WakeupException e) { log.info("{} - RIIING!", id); + shutdown(); } catch(Exception e) { - log.error("{} - Unexpected error: {}", id, e.toString()); - running.set(false); // Mark the instance as not running + log.error("{} - Unexpected error: {}", id, e.toString(), e); + shutdown(e); } finally { @@ -102,27 +169,67 @@ public class EndlessConsumer implements Runnable } } + private void shutdown() + { + shutdown(null); + } + + private void shutdown(Exception e) + { + lock.lock(); + try + { + running = false; + exception = e; + condition.signal(); + } + finally + { + lock.unlock(); + } + } + + public Map> getSeen() + { + return seen; + } - public synchronized void start() + public void start() { - boolean stateChanged = running.compareAndSet(false, true); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); + lock.lock(); + try + { + if (running) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - log.info("{} - Starting - consumed {} messages before", id, consumed); - future = executor.submit(this); + log.info("{} - Starting - consumed {} messages before", id, consumed); + running = true; + exception = null; + executor.submit(this); + } + finally + { + lock.unlock(); + } } public synchronized void stop() throws ExecutionException, InterruptedException { - boolean stateChanged = running.compareAndSet(true, false); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - future.get(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); + lock.lock(); + try + { + if (!running) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + consumer.wakeup(); + condition.await(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + finally + { + lock.unlock(); + } } @PreDestroy @@ -137,9 +244,42 @@ public class EndlessConsumer implements Runnable { log.info("{} - Was already stopped", id); } + catch (Exception e) + { + log.error("{} - Unexpected exception while trying to stop the consumer", id, e); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } + + public boolean running() + { + lock.lock(); + try + { + return running; + } + finally + { + lock.unlock(); + } + } + + public Optional exitStatus() + { + lock.lock(); + try + { + if (running) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return Optional.ofNullable(exception); + } + finally + { + lock.unlock(); + } + } }