From 41e66f90dfa9a06480243064f79f5c85ec0493b6 Mon Sep 17 00:00:00 2001 From: Kai Moritz <kai@juplo.de> Date: Sun, 2 Feb 2025 18:49:06 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20das=20Empfangen=20von=20Nac?= =?utf8?q?hrichten=20aus=20einer=20geteilten=20Lib?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 7 +++--- .../java/de/juplo/kafka/ExampleConsumer.java | 22 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 6ff9155d..c7aa95f5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,7 +4,6 @@ import de.juplo.messages.Message; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -19,7 +18,7 @@ public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer<String, Message> kafkaConsumer, + Consumer<String, String> kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -32,7 +31,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer<String, Message> kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -48,7 +47,7 @@ public class ApplicationConfiguration } props.put("metadata.max.age.ms", 5000); // 5 Sekunden props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); props.put("spring.json.trusted.packages", "de.juplo.messages"); return new KafkaConsumer<>(props); diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index d4109fc2..9d973b61 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -18,7 +18,7 @@ public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer<String, Message> consumer; + private final Consumer<String, String> consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -28,7 +28,7 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer<String, Message> consumer, + Consumer<String, String> consumer, Runnable closeCallback) { this.id = clientId; @@ -52,10 +52,10 @@ public class ExampleConsumer implements Runnable while (true) { - ConsumerRecords<String, Message> records = consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord<String, Message> record : records) + for (ConsumerRecord<String, String> record : records) { handleRecord( record.topic(), @@ -90,16 +90,16 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - Message value) + String value) { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - switch (value.getType()) - { - case ADD -> addNumber((Add)value); - case CALC -> calcSum((Calculate)value); - default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType()); - } + // switch (value.getType()) + // { + // case ADD -> addNumber((Add)value); + // case CALC -> calcSum((Calculate)value); + // default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType()); + // } } private void addNumber(Add add) -- 2.20.1