Merge branch rebalance-listener into counting-consumer
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 063a09e..357a0b4 100644 (file)
@@ -63,6 +63,7 @@ public class EndlessConsumer implements Runnable
       props.put("group.id", groupId);
       props.put("client.id", id);
       props.put("auto.offset.reset", autoOffsetReset);
+      props.put("metadata.max.age.ms", "1000");
       props.put("key.deserializer", StringDeserializer.class.getName());
       props.put("value.deserializer", StringDeserializer.class.getName());
 
@@ -94,7 +95,7 @@ public class EndlessConsumer implements Runnable
           );
 
           Integer partition = record.partition();
-          String key = record.key();
+          String key = record.key() == null ? "NULL" : record.key();
 
           if (!seen.containsKey(partition))
             seen.put(partition, new HashMap<>());