counting-consumer ist eine Verbesserung von endless-consumer
[demos/kafka/training] / src / main / java / de / juplo / kafka / PartitionStatisticsSerializer.java
1 package de.juplo.kafka;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JsonSerializer;
5 import com.fasterxml.jackson.databind.SerializerProvider;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.common.TopicPartition;
8
9 import java.io.IOException;
10
11
12 @Slf4j
13 public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
14 {
15   @Override
16   public Class<PartitionStatistics> handledType()
17   {
18     return PartitionStatistics.class;
19   }
20
21   @Override
22   public void serialize(
23       PartitionStatistics statistics,
24       JsonGenerator jsonGenerator,
25       SerializerProvider serializerProvider) throws IOException
26   {
27     jsonGenerator.writeStartObject();
28     statistics
29         .getStatistics()
30         .forEach((counter) ->
31         {
32           try
33           {
34             jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
35           }
36           catch (NumberFormatException | IOException e)
37           {
38             log.error("Could not write {}: {}", counter, e.toString());
39           }
40         });
41     jsonGenerator.writeEndObject();
42   }
43 }