From 361564abbaf2007394c6fe253c0edaf4decd2dcd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Aug 2022 22:08:12 +0200 Subject: [PATCH] Version des `EndlessProducer`, der Nachrichten vom Typ `Long` schreibt --- README.sh | 2 +- docker-compose.yml | 2 +- pom.xml | 5 +++-- src/main/java/de/juplo/kafka/EndlessProducer.java | 11 ++++++----- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/README.sh b/README.sh index 73ceebc..08fc449 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/endless-long-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker-compose.yml b/docker-compose.yml index a368379..d6855e9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,7 +37,7 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/endless-long-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: diff --git a/pom.xml b/pom.xml index af4a89f..5f145fc 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,9 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + endless-long-producer + Endless Long-Producer + A Simple Producer that endlessly writes numbers into a topic (as long's!) 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 7a5b324..9c69f31 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -3,6 +3,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import javax.annotation.PreDestroy; @@ -18,7 +19,7 @@ public class EndlessProducer implements Runnable private final String id; private final String topic; private final int throttleMs; - private final KafkaProducer producer; + private final KafkaProducer producer; private boolean running = false; private long i = 0; @@ -42,7 +43,7 @@ public class EndlessProducer implements Runnable props.put("client.id", clientId); props.put("acks", acks); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", LongSerializer.class.getName()); this.producer = new KafkaProducer<>(props); } @@ -54,7 +55,7 @@ public class EndlessProducer implements Runnable { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + send(Long.toString(i%10), i); if (throttleMs > 0) { @@ -85,11 +86,11 @@ public class EndlessProducer implements Runnable } } - void send(String key, String value) + void send(String key, Long value) { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value -- 2.20.1