From 8c734b77dd71f6b35707e8085d59ac5b6c43720d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 1 Apr 2022 11:44:22 +0200 Subject: [PATCH] `auto.offset.reset` konfigurierbar gemacht --- src/main/java/de/juplo/kafka/Application.java | 3 ++- src/main/java/de/juplo/kafka/ApplicationProperties.java | 1 + src/main/java/de/juplo/kafka/EndlessConsumer.java | 7 +++++-- src/main/resources/application.yml | 1 + 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 85d0e07..dd4b20a 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -32,7 +32,8 @@ public class Application properties.getBootstrapServer(), properties.getGroupId(), properties.getClientId(), - properties.getTopic()); + properties.getTopic(), + properties.getAutoOffsetReset()); consumer.start(); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index fdbb2bd..dab3380 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -14,4 +14,5 @@ public class ApplicationProperties private String groupId; private String clientId; private String topic; + private String autoOffsetReset; } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index da2f8f0..b3dd446 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,6 +25,7 @@ public class EndlessConsumer implements Runnable private final String groupId; private final String id; private final String topic; + private final String autoOffsetReset; private AtomicBoolean running = new AtomicBoolean(); private long consumed = 0; @@ -36,13 +37,15 @@ public class EndlessConsumer implements Runnable String bootstrapServer, String groupId, String clientId, - String topic) + String topic, + String autoOffsetReset) { this.executor = executor; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; this.topic = topic; + this.autoOffsetReset = autoOffsetReset; } @Override @@ -54,7 +57,7 @@ public class EndlessConsumer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupId); props.put("client.id", id); - props.put("auto.offset.reset", "earliest"); + props.put("auto.offset.reset", autoOffsetReset); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 763880d..db37822 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,6 +3,7 @@ consumer: group-id: my-consumer client-id: peter topic: test + auto-offset-reset: earliest management: endpoints: web: -- 2.20.1