sleep 1
http -v :8081/seen
+docker-compose stop producer
docker-compose exec -T cli bash << 'EOF'
echo "Altering number of partitions from 3 to 7..."
# tag::altertopic[]
# end::altertopic[]
EOF
-docker-compose restart producer
+docker-compose start producer
sleep 1
http -v :8081/seen
sleep 1
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
-import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.util.Assert;
import java.util.concurrent.Executors;
return consumer;
}
- @Bean
- public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
- {
- return
- new Jackson2ObjectMapperBuilder().serializers(
- new TopicPartitionSerializer(),
- new PartitionStatisticsSerializer());
- }
-
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
package de.juplo.kafka;
import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@GetMapping("seen")
- public Map<TopicPartition, PartitionStatistics> seen()
+ public Map<Integer, Map<String, Integer>> seen()
{
return consumer.getSeen();
}
private KafkaConsumer<String, String> consumer = null;
private Future<?> future = null;
- private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
+ private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
public EndlessConsumer(
partitions.forEach(tp ->
{
log.info("{} - removing partition: {}", id, tp);
- PartitionStatistics removed = seen.remove(tp);
- for (KeyCounter counter : removed.getStatistics())
+ Map<String, Integer> removed = seen.remove(tp.partition());
+ for (String key : removed.keySet())
{
log.info(
"{} - Seen {} messages for partition={}|key={}",
id,
- counter.getResult(),
- removed.getPartition(),
- counter.getKey());
+ removed.get(key),
+ tp.partition(),
+ key);
}
});
}
partitions.forEach(tp ->
{
log.info("{} - adding partition: {}", id, tp);
- seen.put(tp, new PartitionStatistics(tp));
+ seen.put(tp.partition(), new HashMap<>());
});
}
});
record.value()
);
- TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+ Integer partition = record.partition();
String key = record.key() == null ? "NULL" : record.key();
- seen.get(partition).increment(key);
+ Map<String, Integer> byKey = seen.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0);
+
+ int seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
}
}
}
}
}
- public Map<TopicPartition, PartitionStatistics> getSeen()
+ public Map<Integer, Map<String, Integer>> getSeen()
{
return seen;
}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode(of = { "key" })
-@ToString
-public class KeyCounter
-{
- private final String key;
-
- private long result = 0;
-
-
- public long increment()
- {
- return ++result;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode(of = { "partition" })
-public class PartitionStatistics
-{
- private final TopicPartition partition;
- private final Map<String, KeyCounter> statistics = new HashMap<>();
-
-
- public KeyCounter addKey(String key)
- {
- KeyCounter counter = new KeyCounter(key);
-
- counter.increment();
- statistics.put(key, counter);
-
- return counter;
- }
-
-
- public long increment(String key)
- {
- KeyCounter counter = statistics.get(key);
-
- if (counter == null)
- {
- counter = new KeyCounter(key);
- statistics.put(key, counter);
- }
-
- return counter.increment();
- }
-
- public Collection<KeyCounter> getStatistics()
- {
- return statistics.values();
- }
-
- @Override
- public String toString()
- {
- return partition.toString();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-@Slf4j
-public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
-{
- @Override
- public Class<PartitionStatistics> handledType()
- {
- return PartitionStatistics.class;
- }
-
- @Override
- public void serialize(
- PartitionStatistics statistics,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException
- {
- jsonGenerator.writeStartObject();
- statistics
- .getStatistics()
- .forEach((counter) ->
- {
- try
- {
- jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
- }
- catch (NumberFormatException | IOException e)
- {
- log.error("Could not write {}: {}", counter, e.toString());
- }
- });
- jsonGenerator.writeEndObject();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-public class TopicPartitionSerializer extends JsonSerializer<TopicPartition>
-{
- @Override
- public Class<TopicPartition> handledType()
- {
- return TopicPartition.class;
- }
-
- @Override
- public void serialize(
- TopicPartition topicPartition,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException
- {
- jsonGenerator.writeString(topicPartition.toString());
- }
-}