From: Kai Moritz Date: Sun, 10 Nov 2024 16:19:53 +0000 (+0100) Subject: Version des ``spring-producer``, die ``long``-Werte verschickt X-Git-Tag: producer/spring-producer--long--2025-04-signal-spickzettel X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e1f5b42f20f747a0caeb2b7decddd085c49bcb24;p=demos%2Fkafka%2Ftraining Version des ``spring-producer``, die ``long``-Werte verschickt --- diff --git a/README.sh b/README.sh index 77fc374..5e6270a 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:2.0-SNAPSHOT +IMAGE=juplo/spring-producer:2.0-long-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 59e7ed5..0fbb8de 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,23 +136,29 @@ services: - kafka-3 producer: - image: juplo/spring-producer:2.0-SNAPSHOT + image: juplo/spring-producer:2.0-long-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: producer juplo.producer.topic: test consumer: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer + image: juplo/spring-consumer:1.1-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: consumer peter: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group peter + image: juplo/spring-consumer:1.1-long-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: peter ute: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group ute + image: juplo/spring-consumer:1.1-long-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: ute volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index be7dea5..a8b37b4 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Producer, based on Spring Boot, that sends messages via Kafka - 2.0-SNAPSHOT + 2.0-long-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index efdfafa..c0bade1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,7 +19,7 @@ public class ApplicationConfiguration public ExampleProducer exampleProducer( @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, - Producer kafkaProducer, + Producer kafkaProducer, ConfigurableApplicationContext applicationContext) { return diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index c5a5a80..c0633b0 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -13,7 +13,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final Producer producer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +25,7 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + Producer producer, Runnable closeCallback) { this.id = id; @@ -49,7 +49,7 @@ public class ExampleProducer implements Runnable { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + send(Long.toString(i%10), i); if (throttle.isPositive()) { @@ -78,11 +78,11 @@ public class ExampleProducer implements Runnable } } - void send(String key, String value) + void send(String key, long value) { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3f6c233..d4ca69a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -11,6 +11,7 @@ spring: buffer-memory: 33554432 batch-size: 16384 compression-type: gzip + value-serializer: org.apache.kafka.common.serialization.LongSerializer properties: metadata.max.age.ms: 5000 request.timeout.ms: 5000