X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=e5ef7d0271590451e90ea7170c8e740e4db0e85b;hb=ccf11628d5a1524d4bffe2f1b21b51ad713f1a67;hp=e67bf412a1bae6762c7270935079e47f57716a1f;hpb=915674ec49ba38b3716cc4ef53272e963f139677;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index e67bf41..e5ef7d0 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -14,8 +14,9 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @@ -29,12 +30,15 @@ public class EndlessConsumer implements Runnable private final String topic; private final String autoOffsetReset; - private AtomicBoolean running = new AtomicBoolean(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private boolean running = false; + private Exception exception; private long consumed = 0; private KafkaConsumer consumer = null; - private Future future = null; - private final Map seen = new HashMap<>(); + + private final Map> seen = new HashMap<>(); public EndlessConsumer( @@ -80,17 +84,17 @@ public class EndlessConsumer implements Runnable partitions.forEach(tp -> { log.info("{} - removing partition: {}", id, tp); - PartitionStatistics removed = seen.remove(tp); - for (KeyCounter counter : removed.getStatistics()) + Map removed = seen.remove(tp.partition()); + for (String key : removed.keySet()) { log.info( "{} - Seen {} messages for partition={}|key={}", id, - counter.getResult(), - removed.getPartition(), - counter.getKey()); + removed.get(key), + tp.partition(), + key); } - repository.save(new StatisticsDocument(removed)); + repository.save(new StatisticsDocument(tp.partition(), removed)); }); } @@ -101,11 +105,11 @@ public class EndlessConsumer implements Runnable { log.info("{} - adding partition: {}", id, tp); seen.put( - tp, + tp.partition(), repository - .findById(tp.toString()) - .map(PartitionStatistics::new) - .orElse(new PartitionStatistics(tp))); + .findById(Integer.toString(tp.partition())) + .map(document -> document.statistics) + .orElse(new HashMap<>())); }); } }); @@ -130,20 +134,28 @@ public class EndlessConsumer implements Runnable record.value() ); - TopicPartition partition = new TopicPartition(record.topic(), record.partition()); + Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); - seen.get(partition).increment(key); + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0); + + int seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } } } catch(WakeupException e) { log.info("{} - RIIING!", id); + shutdown(); } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); - running.set(false); // Mark the instance as not running + shutdown(e); } finally { @@ -153,31 +165,67 @@ public class EndlessConsumer implements Runnable } } - public Map getSeen() + private void shutdown() + { + shutdown(null); + } + + private void shutdown(Exception e) + { + lock.lock(); + try + { + running = false; + exception = e; + condition.signal(); + } + finally + { + lock.unlock(); + } + } + + public Map> getSeen() { return seen; } - public synchronized void start() + public void start() { - boolean stateChanged = running.compareAndSet(false, true); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); + lock.lock(); + try + { + if (running) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - log.info("{} - Starting - consumed {} messages before", id, consumed); - future = executor.submit(this); + log.info("{} - Starting - consumed {} messages before", id, consumed); + running = true; + exception = null; + executor.submit(this); + } + finally + { + lock.unlock(); + } } public synchronized void stop() throws ExecutionException, InterruptedException { - boolean stateChanged = running.compareAndSet(true, false); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - future.get(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); + lock.lock(); + try + { + if (!running) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + consumer.wakeup(); + condition.await(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + finally + { + lock.unlock(); + } } @PreDestroy @@ -192,9 +240,42 @@ public class EndlessConsumer implements Runnable { log.info("{} - Was already stopped", id); } + catch (Exception e) + { + log.error("{} - Unexpected exception while trying to stop the consumer", id, e); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } + + public boolean running() + { + lock.lock(); + try + { + return running; + } + finally + { + lock.unlock(); + } + } + + public Optional exitStatus() + { + lock.lock(); + try + { + if (running) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return Optional.ofNullable(exception); + } + finally + { + lock.unlock(); + } + } }