From: Kai Moritz Date: Thu, 7 Apr 2022 23:19:01 +0000 (+0200) Subject: Merge branch 'stored-state' into stored-offsets X-Git-Tag: wip-DEPRECATED~13 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=620191782035383e0083dc348e4941c9cec0d994;p=demos%2Fkafka%2Ftraining Merge branch 'stored-state' into stored-offsets --- 620191782035383e0083dc348e4941c9cec0d994 diff --cc docker-compose.yml index c7f4e19,5723fc7..b84ed52 --- a/docker-compose.yml +++ b/docker-compose.yml @@@ -32,13 -32,22 +32,22 @@@ services MONGO_INITDB_ROOT_USERNAME: juplo MONGO_INITDB_ROOT_PASSWORD: training + express: + image: mongo-express + ports: + - 8090:8081 + environment: + ME_CONFIG_MONGODB_ADMINUSERNAME: juplo + ME_CONFIG_MONGODB_ADMINPASSWORD: training + ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + - kafka-ui: - image: provectuslabs/kafka-ui:0.3.3 - ports: - - 8080:8080 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + setup: + image: juplo/toolbox + command: > + bash -c " + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test + kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 + " cli: image: juplo/toolbox diff --cc src/main/java/de/juplo/kafka/EndlessConsumer.java index b152310,7cb77aa..2563204 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -87,11 -86,11 +87,11 @@@ public class EndlessConsumer implement log.info( "{} - Seen {} messages for partition={}|key={}", id, - counter.getResult(), - removed.getPartition(), - counter.getKey()); + removed.get(key), + tp.partition(), + key); } - repository.save(new StatisticsDocument(removed, consumer.position(tp))); - repository.save(new StatisticsDocument(tp.partition(), removed)); ++ repository.save(new StatisticsDocument(tp.partition(), removed, consumer.position(tp))); }); } @@@ -101,12 -100,12 +101,12 @@@ partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put( - tp.partition(), + StatisticsDocument document = repository - .findById(tp.toString()) - .orElse(new StatisticsDocument(tp)); + .findById(Integer.toString(tp.partition())) - .map(document -> document.statistics) - .orElse(new HashMap<>())); ++ .orElse(new StatisticsDocument(tp.partition())); + consumer.seek(tp, document.offset); - seen.put(tp, new PartitionStatistics(document)); ++ seen.put(tp.partition(), document.statistics); }); } }); @@@ -131,12 -130,17 +131,23 @@@ record.value() ); - TopicPartition partition = new TopicPartition(record.topic(), record.partition()); + Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); - seen.get(partition).increment(key); + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0); + + int seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } + - seen.forEach((tp, statistics) -> repository.save(new StatisticsDocument(statistics, consumer.position(tp)))); ++ seen.forEach((partiton, statistics) -> repository.save( ++ new StatisticsDocument( ++ partiton, ++ statistics, ++ consumer.position(new TopicPartition(topic, partiton))))); } } catch(WakeupException e) diff --cc src/main/java/de/juplo/kafka/StatisticsDocument.java index e8c2e9b,be998ca..96ebfb1 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@@ -15,35 -14,15 +14,23 @@@ public class StatisticsDocumen { @Id public String id; - public String topic; - public Integer partition; + public long offset; - public Map statistics; + public Map statistics; public StatisticsDocument() { } - public StatisticsDocument(TopicPartition tp) - public StatisticsDocument(Integer partition, Map statistics) ++ public StatisticsDocument(Integer partition) + { - this.topic = tp.topic(); - this.partition = tp.partition(); - this.offset = 0; ++ this.id = Integer.toString(partition); ++ this.statistics = new HashMap<>(); + } + - public StatisticsDocument(String topic, Integer partition, Map statistics) ++ public StatisticsDocument(Integer partition, Map statistics, long offset) { - this.partition = partition; + this.id = Integer.toString(partition); this.statistics = statistics; - } - - public StatisticsDocument(PartitionStatistics statistics, long offset) - { - this.topic = statistics.getPartition().topic(); - this.id = statistics.toString(); - this.partition = statistics.getPartition().partition(); + this.offset = offset; - this.statistics = new HashMap<>(); - statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult())); } }