From: Kai Moritz Date: Fri, 1 Apr 2022 22:46:08 +0000 (+0200) Subject: Vorlage X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=bb2b0fd46b4b3b40ae7205d0d31444e3cbd1eb57;p=demos%2Fkafka%2Ftraining Vorlage --- diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 063a09e..2c844a7 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -75,39 +75,7 @@ public class EndlessConsumer implements Runnable while (true) { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - Integer partition = record.partition(); - String key = record.key(); - - if (!seen.containsKey(partition)) - seen.put(partition, new HashMap<>()); - - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0); - - int seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } + // TODO: Nachrichten empfangen und zählen } } catch(WakeupException e)