X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=da2f8f0c29653a2420c1769c0db75cbc6db872c3;hb=31530b8f210b923f5e010104b8a514a2060e7665;hp=22dce95ee17a7c6c9f089bbe5e9377fe8bdcdba3;hpb=e87f4bb2bc188252955fb4932ddd99161ba621d3;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 22dce95..da2f8f0 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -48,18 +48,18 @@ public class EndlessConsumer implements Runnable @Override public void run() { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("group.id", groupId); - props.put("client.id", id); - props.put("auto.offset.reset", "earliest"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - this.consumer = new KafkaConsumer<>(props); - try { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServer); + props.put("group.id", groupId); + props.put("client.id", id); + props.put("auto.offset.reset", "earliest"); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + this.consumer = new KafkaConsumer<>(props); + log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); @@ -91,7 +91,7 @@ public class EndlessConsumer implements Runnable } catch(Exception e) { - log.error("{} - Unexpected error: {}", id, e.toString()); + log.error("{} - Unexpected error: {}", id, e.toString(), e); running.set(false); // Mark the instance as not running } finally