From 428ce56a7ab470b5adf8e69b62504d6dc9f1672f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 18 Mar 2025 19:58:41 +0100 Subject: [PATCH] VORLAGE --- README.sh | 42 ----------- .../juplo/kafka/ApplicationConfiguration.java | 2 +- .../java/de/juplo/kafka/ExampleProducer.java | 71 ++----------------- src/main/resources/application.yml | 4 -- 4 files changed, 5 insertions(+), 114 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index 982f7bd3..00000000 --- a/README.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d peter ute -sleep 15 - -docker compose -f docker/docker-compose.yml stop producer - -echo -echo "Von peter empfangen:" -docker compose -f docker/docker-compose.yml logs peter | grep '\ test\/.' -echo -echo "Von ute empfangen:" -docker compose -f docker/docker-compose.yml logs ute | grep '\ test\/.' - -docker compose -f docker/docker-compose.yml stop peter ute diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e212a253..f6701728 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,7 +19,7 @@ public class ApplicationConfiguration public ExampleProducer exampleProducer( @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, - Producer kafkaProducer, + Producer kafkaProducer, // << TODO: Typisierung anpassen ConfigurableApplicationContext applicationContext) { return diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 0d152f77..9d5983b8 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -13,7 +13,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final Producer producer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +25,7 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + Producer producer, // << TODO: Typisierung anpassen Runnable closeCallback) { this.id = id; @@ -54,19 +54,8 @@ public class ExampleProducer implements Runnable ? new CalculateSumMessage(number) : new AddNumberMessage(number, i); - send(Long.toString(number), message); - - if (throttle.isPositive()) - { - try - { - Thread.sleep(throttle); - } - catch (InterruptedException e) - { - log.warn("{} - Interrupted while throttling!", e); - } - } + // TODO: Versenden aus Ihrer bisherigen Implementierung (inkl. Logging & Zählen) + // ABER: Anstatt der Zahl soll "message" als JSON serialisiert verschickt werden } } catch (Exception e) @@ -83,58 +72,6 @@ public class ExampleProducer implements Runnable } } - void send(String key, SumupMessage value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - produced++; - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - now - time - ); - } - public void shutdown() throws InterruptedException { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index daf440e9..008742c1 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -11,11 +11,7 @@ spring: buffer-memory: 33554432 batch-size: 16384 compression-type: gzip - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: - spring.json.type.mapping: >- - ADD:de.juplo.kafka.AddNumberMessage, - CALC:de.juplo.kafka.CalculateSumMessage delivery-timeout: 10s max-block: 5s linger: 0 -- 2.20.1