X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=357a0b43bdef280fe85ca42933c6bb56c2e88162;hb=e98a45a0f82e0197d4ec9178f3ae88bb52dff821;hp=063a09ebcc3e9ced137fda80af7d962c6f849580;hpb=808bd074aaae940e5f81bc9f09c42d48d1fd2670;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 063a09e..357a0b4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -63,6 +63,7 @@ public class EndlessConsumer implements Runnable props.put("group.id", groupId); props.put("client.id", id); props.put("auto.offset.reset", autoOffsetReset); + props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); @@ -94,7 +95,7 @@ public class EndlessConsumer implements Runnable ); Integer partition = record.partition(); - String key = record.key(); + String key = record.key() == null ? "NULL" : record.key(); if (!seen.containsKey(partition)) seen.put(partition, new HashMap<>());