From 10ce2447023fbf9ceb9e431a97a38dd8b5f488fa Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 7 Apr 2022 09:25:41 +0200 Subject: [PATCH] =?utf8?q?R=C3=BCckbau=20auf=20verschachtelte=20Maps?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die zuvor erfundenen fachlichen Klassen passen nicht zu dazu, dass man sie - wie gedacht - direkt in MongoDB stopfen könnte. * Daher hier erst mal der Rückbau auf Maps, da das dan für die Übungen einfacher ist. --- README.sh | 3 +- src/main/java/de/juplo/kafka/Application.java | 11 ---- .../java/de/juplo/kafka/DriverController.java | 3 +- .../java/de/juplo/kafka/EndlessConsumer.java | 27 +++++---- src/main/java/de/juplo/kafka/KeyCounter.java | 24 -------- .../de/juplo/kafka/PartitionStatistics.java | 56 ------------------- .../kafka/PartitionStatisticsSerializer.java | 43 -------------- .../juplo/kafka/TopicPartitionSerializer.java | 27 --------- 8 files changed, 20 insertions(+), 174 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/KeyCounter.java delete mode 100644 src/main/java/de/juplo/kafka/PartitionStatistics.java delete mode 100644 src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java delete mode 100644 src/main/java/de/juplo/kafka/TopicPartitionSerializer.java diff --git a/README.sh b/README.sh index c14f45b..13176d2 100755 --- a/README.sh +++ b/README.sh @@ -47,6 +47,7 @@ http -v :8081/seen sleep 1 http -v :8081/seen +docker-compose stop producer docker-compose exec -T cli bash << 'EOF' echo "Altering number of partitions from 3 to 7..." # tag::altertopic[] @@ -55,7 +56,7 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test # end::altertopic[] EOF -docker-compose restart producer +docker-compose start producer sleep 1 http -v :8081/seen sleep 1 diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 7cfe268..dd4b20a 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.util.Assert; import java.util.concurrent.Executors; @@ -41,16 +40,6 @@ public class Application return consumer; } - @Bean - public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder() - { - return - new Jackson2ObjectMapperBuilder().serializers( - new TopicPartitionSerializer(), - new PartitionStatisticsSerializer()); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index ddff42e..a504842 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -1,7 +1,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.TopicPartition; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; @@ -31,7 +30,7 @@ public class DriverController @GetMapping("seen") - public Map seen() + public Map> seen() { return consumer.getSeen(); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index c7bc852..14a875b 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -33,7 +33,7 @@ public class EndlessConsumer implements Runnable private KafkaConsumer consumer = null; private Future future = null; - private final Map seen = new HashMap<>(); + private final Map> seen = new HashMap<>(); public EndlessConsumer( @@ -77,15 +77,15 @@ public class EndlessConsumer implements Runnable partitions.forEach(tp -> { log.info("{} - removing partition: {}", id, tp); - PartitionStatistics removed = seen.remove(tp); - for (KeyCounter counter : removed.getStatistics()) + Map removed = seen.remove(tp.partition()); + for (String key : removed.keySet()) { log.info( "{} - Seen {} messages for partition={}|key={}", id, - counter.getResult(), - removed.getPartition(), - counter.getKey()); + removed.get(key), + tp.partition(), + key); } }); } @@ -96,7 +96,7 @@ public class EndlessConsumer implements Runnable partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put(tp, new PartitionStatistics(tp)); + seen.put(tp.partition(), new HashMap<>()); }); } }); @@ -121,9 +121,16 @@ public class EndlessConsumer implements Runnable record.value() ); - TopicPartition partition = new TopicPartition(record.topic(), record.partition()); + Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); - seen.get(partition).increment(key); + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0); + + int seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } } } @@ -144,7 +151,7 @@ public class EndlessConsumer implements Runnable } } - public Map getSeen() + public Map> getSeen() { return seen; } diff --git a/src/main/java/de/juplo/kafka/KeyCounter.java b/src/main/java/de/juplo/kafka/KeyCounter.java deleted file mode 100644 index b2cde47..0000000 --- a/src/main/java/de/juplo/kafka/KeyCounter.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.juplo.kafka; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.ToString; - - -@RequiredArgsConstructor -@Getter -@EqualsAndHashCode(of = { "key" }) -@ToString -public class KeyCounter -{ - private final String key; - - private long result = 0; - - - public long increment() - { - return ++result; - } -} diff --git a/src/main/java/de/juplo/kafka/PartitionStatistics.java b/src/main/java/de/juplo/kafka/PartitionStatistics.java deleted file mode 100644 index e47a9f9..0000000 --- a/src/main/java/de/juplo/kafka/PartitionStatistics.java +++ /dev/null @@ -1,56 +0,0 @@ -package de.juplo.kafka; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.TopicPartition; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - - -@RequiredArgsConstructor -@Getter -@EqualsAndHashCode(of = { "partition" }) -public class PartitionStatistics -{ - private final TopicPartition partition; - private final Map statistics = new HashMap<>(); - - - public KeyCounter addKey(String key) - { - KeyCounter counter = new KeyCounter(key); - - counter.increment(); - statistics.put(key, counter); - - return counter; - } - - - public long increment(String key) - { - KeyCounter counter = statistics.get(key); - - if (counter == null) - { - counter = new KeyCounter(key); - statistics.put(key, counter); - } - - return counter.increment(); - } - - public Collection getStatistics() - { - return statistics.values(); - } - - @Override - public String toString() - { - return partition.toString(); - } -} diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java b/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java deleted file mode 100644 index ed8230d..0000000 --- a/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.SerializerProvider; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.TopicPartition; - -import java.io.IOException; - - -@Slf4j -public class PartitionStatisticsSerializer extends JsonSerializer -{ - @Override - public Class handledType() - { - return PartitionStatistics.class; - } - - @Override - public void serialize( - PartitionStatistics statistics, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException - { - jsonGenerator.writeStartObject(); - statistics - .getStatistics() - .forEach((counter) -> - { - try - { - jsonGenerator.writeNumberField(counter.getKey(), counter.getResult()); - } - catch (NumberFormatException | IOException e) - { - log.error("Could not write {}: {}", counter, e.toString()); - } - }); - jsonGenerator.writeEndObject(); - } -} diff --git a/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java b/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java deleted file mode 100644 index e7190a5..0000000 --- a/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java +++ /dev/null @@ -1,27 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.SerializerProvider; -import org.apache.kafka.common.TopicPartition; - -import java.io.IOException; - - -public class TopicPartitionSerializer extends JsonSerializer -{ - @Override - public Class handledType() - { - return TopicPartition.class; - } - - @Override - public void serialize( - TopicPartition topicPartition, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException - { - jsonGenerator.writeString(topicPartition.toString()); - } -} -- 2.20.1