From bb2b0fd46b4b3b40ae7205d0d31444e3cbd1eb57 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Apr 2022 00:46:08 +0200 Subject: [PATCH] Vorlage --- .../java/de/juplo/kafka/EndlessConsumer.java | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 063a09e..2c844a7 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -75,39 +75,7 @@ public class EndlessConsumer implements Runnable while (true) { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - Integer partition = record.partition(); - String key = record.key(); - - if (!seen.containsKey(partition)) - seen.put(partition, new HashMap<>()); - - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0); - - int seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } + // TODO: Nachrichten empfangen und zählen } } catch(WakeupException e) -- 2.20.1