X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=f25e93c0403195dff2c09e4f4c9bf385a035aaf0;hb=abc2ffdb4d2829cd2c4263df9f6cec10b2a60c03;hp=22dce95ee17a7c6c9f089bbe5e9377fe8bdcdba3;hpb=e87f4bb2bc188252955fb4932ddd99161ba621d3;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 22dce95..f25e93c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,6 +25,7 @@ public class EndlessConsumer implements Runnable private final String groupId; private final String id; private final String topic; + private final String autoOffsetReset; private AtomicBoolean running = new AtomicBoolean(); private long consumed = 0; @@ -36,30 +37,32 @@ public class EndlessConsumer implements Runnable String bootstrapServer, String groupId, String clientId, - String topic) + String topic, + String autoOffsetReset) { this.executor = executor; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; this.topic = topic; + this.autoOffsetReset = autoOffsetReset; } @Override public void run() { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("group.id", groupId); - props.put("client.id", id); - props.put("auto.offset.reset", "earliest"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - this.consumer = new KafkaConsumer<>(props); - try { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServer); + props.put("group.id", groupId); + props.put("client.id", id); + props.put("auto.offset.reset", autoOffsetReset); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + this.consumer = new KafkaConsumer<>(props); + log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); @@ -91,7 +94,7 @@ public class EndlessConsumer implements Runnable } catch(Exception e) { - log.error("{} - Unexpected error: {}", id, e.toString()); + log.error("{} - Unexpected error: {}", id, e.toString(), e); running.set(false); // Mark the instance as not running } finally @@ -107,7 +110,7 @@ public class EndlessConsumer implements Runnable { boolean stateChanged = running.compareAndSet(false, true); if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); + throw new IllegalStateException("Consumer instance " + id + " is already running!"); log.info("{} - Starting - consumed {} messages before", id, consumed); future = executor.submit(this); @@ -117,7 +120,7 @@ public class EndlessConsumer implements Runnable { boolean stateChanged = running.compareAndSet(true, false); if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); + throw new IllegalStateException("Consumer instance " + id + " is not running!"); log.info("{} - Stopping", id); consumer.wakeup(); @@ -137,6 +140,10 @@ public class EndlessConsumer implements Runnable { log.info("{} - Was already stopped", id); } + catch (Exception e) + { + log.error("{} - Unexpected exception while trying to stop the consumer", id, e); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed);