From: Kai Moritz Date: Mon, 17 Mar 2025 16:57:54 +0000 (+0100) Subject: Vorlage an `consumer/spring-consumer--vorlage` angeglichen X-Git-Tag: springkafka/spring-consumer--json--vorlage--2026-03-22--22-01~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d758d72fa059ca32ddfab5833af7cf331d518c8e;p=demos%2Fkafka%2Ftraining Vorlage an `consumer/spring-consumer--vorlage` angeglichen --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index fe06d006..3cadf3e0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,7 +7,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JacksonJsonDeserializer; import java.util.Properties; @@ -18,7 +17,7 @@ public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, // << TODO: Typisierung anpassen ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -31,6 +30,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") + // TODO: Typisierung anpassen public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -47,7 +47,7 @@ public class ApplicationConfiguration } props.put("metadata.max.age.ms", 5000); // 5 Sekunden props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); // << TODO: Passende Strategie konfigurieren return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 15c3959a..1d25f187 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -15,7 +15,7 @@ public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; // << TODO: Typisierung anpassen private final Thread workerThread; private final Runnable closeCallback; @@ -45,22 +45,18 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + // TODO: Aufruf von consumer.subscribe() while (true) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } + // TODO: Poll & Consume aus Ihrer bisherigen Implementierung + // TODO: Verarbeitung abhängig vom Typ der Nachricht auslösen + // switch (value.getType()) + // { + // case ADD -> addNumber((MessageAddNumber)value); + // case CALC -> calcSum((MessageCalculateSum)value); + // default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType()); + //} } } catch(WakeupException e) @@ -82,23 +78,6 @@ public class ExampleConsumer implements Runnable } } - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - // switch (value.getType()) - // { - // case ADD -> addNumber((MessageAddNumber)value); - // case CALC -> calcSum((MessageCalculateSum)value); - // default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType()); - //} - } - private void addNumber(MessageAddNumber addNumber) { log.info("{} - Adding number {}", id, addNumber.getNext());