Consumer<String, String> kafkaConsumer,
ApplicationProperties properties)
{
- return new ExampleConsumer(
- kafkaConsumer,
- properties.getTopic(),
- properties.getClientId());
+ return
+ new ExampleConsumer(
+ kafkaConsumer,
+ properties.getTopic(),
+ properties.getClientId());
}
@Bean(destroyMethod = "")
{
Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBroker());
- props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits
- props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("group.id", properties.getGroupId());
+ props.put("client.id", properties.getClientId());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
private final String id;
private final String topic;
private final Consumer<String, String> consumer;
- private final Thread worker;
+ private final Thread workerThread;
private long consumed = 0;
this.id = clientId;
this.topic = topic;
- this.worker = new Thread(this, "ConsumerRunner-" + id);
- log.info("{} - Starting worker-thread", id);
- this.worker.start();
+ log.info("{} - Starting worker thread", id);
+ this.workerThread = new Thread(this, "ConsumerRunner-" + id);
+ this.workerThread.start();
}
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}
- public void shutdown()
+
+ public void shutdown() throws InterruptedException
{
log.info("{} - Waking up the consumer", id);
consumer.wakeup();
- try
- {
- log.info("{} - Joining the worker-thread", id);
- worker.join(Duration.ofSeconds(30));
- }
- catch (InterruptedException e)
- {
- log.error("{} - Joining was interrupted: {}", id, e.toString());
- }
+ log.info("{} - Joining the worker thread", id);
+ workerThread.join();
log.info("{} - Shutdown completed!", id);
}
}
-