From 0816c4ae4a9829f384715a7991cf2c8fe2a5843c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Nov 2024 03:31:03 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20=C3=9Cbung=20zum=20`J?= =?utf8?q?sonSerializer`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 42 ------------------- .../juplo/kafka/ApplicationConfiguration.java | 8 ++-- .../java/de/juplo/kafka/ExampleProducer.java | 10 ++--- 3 files changed, 8 insertions(+), 52 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index 68deb85..0000000 --- a/README.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer-1 consumer-2 -sleep 15 - -docker compose -f docker/docker-compose.yml stop producer - -echo -echo "Von consumer-1 empfangen:" -docker compose -f docker/docker-compose.yml logs consumer-1 | grep '\ test\/.' -echo -echo "Von consumer-2 empfangen:" -docker compose -f docker/docker-compose.yml logs consumer-2 | grep '\ test\/.' - -docker compose -f docker/docker-compose.yml stop consumer-1 consumer-2 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index d5ce01a..7540dd3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,7 +7,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonSerializer; import java.time.Duration; import java.util.Properties; @@ -20,7 +19,7 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, - Producer kafkaProducer, + Producer kafkaProducer, ConfigurableApplicationContext applicationContext) { return @@ -35,7 +34,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -50,8 +49,7 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); props.put("compression.type", properties.getProducerProperties().getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", JsonSerializer.class.getName()); - props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.AddNumberMessage,CALC:de.juplo.kafka.CalculateSumMessage"); + props.put("value.serializer", StringSerializer.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 2360704..51eba2a 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -13,7 +13,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final Producer producer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +25,7 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + Producer producer, Runnable closeCallback) { this.id = id; @@ -54,7 +54,7 @@ public class ExampleProducer implements Runnable ? new CalculateSumMessage(number) : new AddNumberMessage(number, i); - send(Long.toString(number), message); + send(Long.toString(number), message.toString()); if (throttle.isPositive()) { @@ -83,11 +83,11 @@ public class ExampleProducer implements Runnable } } - void send(String key, SumupMessage value) + void send(String key, String value) { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value -- 2.20.1