- <artifactId>simple-producer</artifactId>
- <name>Super Simple Producer</name>
+ <artifactId>supersimple-producer</artifactId>
+ <name>Super Simple Producer</name>
+ <description>Most minimal Kafka Producer ever!</description>
+ <properties>
+ <java.version>11</java.version>
+ </properties>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
- <groupId>pl.project13.maven</groupId>
- <artifactId>git-commit-id-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
- <id>copy-dependencies</id>
- <phase>package</phase>
- <goal>copy-dependencies</goal>
+ <goal>build-info</goal>
- <configuration>
- <outputDirectory>${project.build.directory}/libs</outputDirectory>
- </configuration>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <archive>
- <manifest>
- <addClasspath>true</addClasspath>
- <classpathPrefix>libs/</classpathPrefix>
- <mainClass>de.juplo.kafka.Application</mainClass>
- </manifest>
- </archive>
- </configuration>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
-import java.util.Properties;
+import java.util.List;
-public class SimpleProducer
+public class Application implements ApplicationRunner
- private final String id;
- private final String topic;
- private final KafkaProducer<String, String> producer;
+ public final static String ARG_NUM = "num";
- private long produced = 0;
- private volatile boolean running = true;
- private volatile boolean done = false;
+ @Autowired
+ KafkaTemplate<String, String> kafkaTemplate;
- public SimpleProducer(String broker, String topic, String clientId)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", broker);
- props.put("client.id", clientId); // Nur zur Wiedererkennung
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
- producer = new KafkaProducer<>(props);
- this.topic = topic;
- this.id = clientId;
+ void send(String key, String value)
+ {
+ ListenableFuture<SendResult<String, String>> listenableFuture =
+ kafkaTemplate.sendDefault(key, value);
+ listenableFuture.addCallback(
+ result -> log.debug(
+ "Sent {}={} to partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset()),
+ e -> log.error("ERROR sendig message", e));
- public void run()
+ @Override
+ public void run(ApplicationArguments args)
- long i = 0;
+ int num = 10;
- try
+ if (args.containsOption(ARG_NUM))
- for (; running ; i++)
+ List<String> numArgs = args.getOptionValues(ARG_NUM);
+ if (numArgs.size() > 1)
- send(Long.toString(i%10), Long.toString(i));
- Thread.sleep(500);
+ log.error(
+ "Only one occurence of argument {} is allowed, but found: {}",
+ numArgs.size());
+ return;
- }
- catch (InterruptedException e) {}
- finally
- {
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
- log.info("{}: Produced {} messages in total, exiting!", id, produced);
- done = true;
- }
- }
- void send(String key, String value)
- {
- final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
+ try
- produced++;
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
+ num = Integer.parseInt(numArgs.get(0));
- else
+ catch (NumberFormatException e)
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
+ log.error("{} is not a number: {}", numArgs.get(0), e.getMessage());
- });
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued #{} key={} latency={}ms",
- id,
- value,
- record.key(),
- now - time
- );
- }
- public static void main(String[] args) throws Exception
- {
- String broker = ":9092";
- String topic = "test";
- String clientId = "DEV";
- switch (args.length)
- {
- case 3:
- clientId = args[2];
- case 2:
- topic = args[1];
- case 1:
- broker = args[0];
- SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
+ for (int i = 0; i < num; i++)
- instance.running = false;
- while (!instance.done)
+ send(Long.toString(i%10), Long.toString(i));
+ try
- log.info("Waiting for main-thread...");
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {}
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ {
+ log.info("Interrupted after sending {} messages", i);
+ return;
- log.info("Shutdown completed.");
- }));
+ }
+ }
- log.info(
- "Running SimpleProducer: broker={}, topic={}, client-id={}",
- broker,
- topic,
- clientId);
- instance.run();
+ public static void main(String[] args)
+ {
+ SpringApplication.run(Application.class, args);