sleep 1
http -v :8081/seen
+docker-compose stop producer
docker-compose exec -T cli bash << 'EOF'
echo "Altering number of partitions from 3 to 7..."
# tag::altertopic[]
# end::altertopic[]
EOF
-docker-compose restart producer
+docker-compose start producer
sleep 1
http -v :8081/seen
sleep 1
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
-import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.util.Assert;
import java.util.concurrent.Executors;
return consumer;
}
- @Bean
- public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
- {
- return
- new Jackson2ObjectMapperBuilder().serializers(
- new TopicPartitionSerializer(),
- new PartitionStatisticsSerializer());
- }
-
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
package de.juplo.kafka;
import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@GetMapping("seen")
- public Map<TopicPartition, PartitionStatistics> seen()
+ public Map<Integer, Map<String, Integer>> seen()
{
return consumer.getSeen();
}
private KafkaConsumer<String, String> consumer = null;
private Future<?> future = null;
- private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
+ private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
public EndlessConsumer(
partitions.forEach(tp ->
{
log.info("{} - removing partition: {}", id, tp);
- PartitionStatistics removed = seen.remove(tp);
- for (KeyCounter counter : removed.getStatistics())
+ Map<String, Integer> removed = seen.remove(tp.partition());
+ for (String key : removed.keySet())
{
log.info(
"{} - Seen {} messages for partition={}|key={}",
id,
- counter.getResult(),
- removed.getPartition(),
- counter.getKey());
+ removed.get(key),
+ tp.partition(),
+ key);
}
- repository.save(new StatisticsDocument(removed));
+ repository.save(new StatisticsDocument(tp.partition(), removed));
});
}
{
log.info("{} - adding partition: {}", id, tp);
seen.put(
- tp,
+ tp.partition(),
repository
- .findById(tp.toString())
- .map(PartitionStatistics::new)
- .orElse(new PartitionStatistics(tp)));
+ .findById(Integer.toString(tp.partition()))
+ .map(document -> document.statistics)
+ .orElse(new HashMap<>()));
});
}
});
record.value()
);
- TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+ Integer partition = record.partition();
String key = record.key() == null ? "NULL" : record.key();
- seen.get(partition).increment(key);
+ Map<String, Integer> byKey = seen.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0);
+
+ int seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
}
}
}
}
}
- public Map<TopicPartition, PartitionStatistics> getSeen()
+ public Map<Integer, Map<String, Integer>> getSeen()
{
return seen;
}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode(of = { "key" })
-@ToString
-public class KeyCounter
-{
- private final String key;
-
- private long result = 0;
-
-
- public KeyCounter(String key, long initialValue)
- {
- this.key = key;
- this.result = initialValue;
- }
-
-
- public long increment()
- {
- return ++result;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Getter
-@EqualsAndHashCode(of = { "partition" })
-public class PartitionStatistics
-{
- private String id;
- private final TopicPartition partition;
- private final Map<String, KeyCounter> statistics = new HashMap<>();
-
-
- public PartitionStatistics(TopicPartition partition)
- {
- this.partition = partition;
- }
-
- public PartitionStatistics(StatisticsDocument document)
- {
- this.partition = new TopicPartition(document.topic, document.partition);
- document
- .statistics
- .entrySet()
- .forEach(entry ->
- {
- this.statistics.put(
- entry.getKey(),
- new KeyCounter(entry.getKey(), entry.getValue()));
- });
- }
-
-
- public KeyCounter addKey(String key)
- {
- KeyCounter counter = new KeyCounter(key);
-
- counter.increment();
- statistics.put(key, counter);
-
- return counter;
- }
-
-
- public long increment(String key)
- {
- KeyCounter counter = statistics.get(key);
-
- if (counter == null)
- {
- counter = new KeyCounter(key);
- statistics.put(key, counter);
- }
-
- return counter.increment();
- }
-
- public Collection<KeyCounter> getStatistics()
- {
- return statistics.values();
- }
-
- @Override
- public String toString()
- {
- return partition.toString();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-@Slf4j
-public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
-{
- @Override
- public Class<PartitionStatistics> handledType()
- {
- return PartitionStatistics.class;
- }
-
- @Override
- public void serialize(
- PartitionStatistics statistics,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException
- {
- jsonGenerator.writeStartObject();
- statistics
- .getStatistics()
- .forEach((counter) ->
- {
- try
- {
- jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
- }
- catch (NumberFormatException | IOException e)
- {
- log.error("Could not write {}: {}", counter, e.toString());
- }
- });
- jsonGenerator.writeEndObject();
- }
-}
{
@Id
public String id;
- public String topic;
- public Integer partition;
- public Map<String, Long> statistics;
+ public Map<String, Integer> statistics;
public StatisticsDocument()
{
}
- public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
+ public StatisticsDocument(Integer partition, Map<String, Integer> statistics)
{
- this.partition = partition;
+ this.id = Integer.toString(partition);
this.statistics = statistics;
}
-
- public StatisticsDocument(PartitionStatistics statistics)
- {
- this.topic = statistics.getPartition().topic();
- this.id = statistics.toString();
- this.partition = statistics.getPartition().partition();
- this.statistics = new HashMap<>();
- statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
- }
}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-public class TopicPartitionSerializer extends JsonSerializer<TopicPartition>
-{
- @Override
- public Class<TopicPartition> handledType()
- {
- return TopicPartition.class;
- }
-
- @Override
- public void serialize(
- TopicPartition topicPartition,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException
- {
- jsonGenerator.writeString(topicPartition.toString());
- }
-}