import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.util.Assert;
import java.util.concurrent.Executors;
return consumer;
}
+ @Bean
+ public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
+ {
+ return
+ new Jackson2ObjectMapperBuilder().serializers(
+ new TopicPartitionSerializer(),
+ new PartitionStatisticsSerializer());
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
package de.juplo.kafka;
import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@GetMapping("seen")
- public Map<Integer, Map<String, Integer>> seen()
+ public Map<TopicPartition, PartitionStatistics> seen()
{
return consumer.getSeen();
}
private KafkaConsumer<String, String> consumer = null;
private Future<?> future = null;
- private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
+ private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
public EndlessConsumer(
partitions.forEach(tp ->
{
log.info("{} - removing partition: {}", id, tp);
- Map<String, Integer> removed = seen.remove(tp.partition());
- for (String key : removed.keySet())
+ PartitionStatistics removed = seen.remove(tp);
+ for (KeyCounter counter : removed.getStatistics())
{
log.info(
"{} - Seen {} messages for partition={}|key={}",
id,
- removed.get(key),
- removed,
- key);
+ counter.getCounter(),
+ removed.getPartition(),
+ counter.getKey());
}
});
}
partitions.forEach(tp ->
{
log.info("{} - adding partition: {}", id, tp);
- seen.put(tp.partition(), new HashMap<>());
+ seen.put(tp, new PartitionStatistics(tp));
});
}
});
record.value()
);
- Integer partition = record.partition();
+ TopicPartition partition = new TopicPartition(record.topic(), record.partition());
String key = record.key() == null ? "NULL" : record.key();
- Map<String, Integer> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0);
-
- int seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
+ seen.get(partition).increment(key);
}
}
}
}
}
- public Map<Integer, Map<String, Integer>> getSeen()
+ public Map<TopicPartition, PartitionStatistics> getSeen()
{
return seen;
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode(of = { "key" })
+@ToString
+public class KeyCounter
+{
+ private final String key;
+
+ private long counter = 0;
+
+
+ public long increment()
+ {
+ return ++counter;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode(of = { "partition" })
+public class PartitionStatistics
+{
+ private final TopicPartition partition;
+ private final Map<String, KeyCounter> statistics = new HashMap<>();
+
+
+ public KeyCounter addKey(String key)
+ {
+ KeyCounter counter = new KeyCounter(key);
+
+ counter.increment();
+ statistics.put(key, counter);
+
+ return counter;
+ }
+
+
+ public long increment(String key)
+ {
+ KeyCounter counter = statistics.get(key);
+
+ if (counter == null)
+ {
+ counter = new KeyCounter(key);
+ statistics.put(key, counter);
+ }
+
+ return counter.increment();
+ }
+
+ public Collection<KeyCounter> getStatistics()
+ {
+ return statistics.values();
+ }
+
+ @Override
+ public String toString()
+ {
+ return partition.toString();
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+
+
+@Slf4j
+public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
+{
+ @Override
+ public Class<PartitionStatistics> handledType()
+ {
+ return PartitionStatistics.class;
+ }
+
+ @Override
+ public void serialize(
+ PartitionStatistics statistics,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException
+ {
+ jsonGenerator.writeStartObject();
+ statistics
+ .getStatistics()
+ .forEach((counter) ->
+ {
+ try
+ {
+ jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
+ }
+ catch (NumberFormatException | IOException e)
+ {
+ log.error("Could not write {}: {}", counter, e.toString());
+ }
+ });
+ jsonGenerator.writeEndObject();
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+
+
+public class TopicPartitionSerializer extends JsonSerializer<TopicPartition>
+{
+ @Override
+ public Class<TopicPartition> handledType()
+ {
+ return TopicPartition.class;
+ }
+
+ @Override
+ public void serialize(
+ TopicPartition topicPartition,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException
+ {
+ jsonGenerator.writeString(topicPartition.toString());
+ }
+}