--- /dev/null
+*
+!target/*.jar
--- /dev/null
+target/*.jar
--- /dev/null
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
#!/bin/bash
+IMAGE=juplo/endless-producer:1.0-SNAPSHOT
+
if [ "$1" = "cleanup" ]
then
docker-compose down -v
exit
fi
-mvn package || exit 1
-if [ "$1" = "build" ]; then exit; fi
-
-trap 'kill $(jobs -p) 2>/dev/null' EXIT
+docker-compose up -d zookeeper kafka cli
-docker-compose up -d
+if [[
+ $(docker image ls -q $IMAGE) == "" ||
+ "$1" = "build"
+]]
+then
+ mvn install || exit
+else
+ echo "Using image existing images:"
+ docker image ls $IMAGE
+fi
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-
-echo "Producing messages"
-mvn exec:java@producer
-
-echo "Reading messages"
-mvn exec:java@consumer &
-sleep 7
-kill $(jobs -p)
-sleep 2
-
-echo "Re-Reading messages"
-mvn exec:java@consumer &
-sleep 7
-kill $(jobs -p)
-sleep 2
+docker-compose up setup
+docker-compose up -d producer
+sleep 5
+docker-compose stop producer
+docker-compose logs producer
cli:
image: juplo/toolbox
command: sleep infinity
+
+ producer:
+ image: juplo/endless-producer:1.0-SNAPSHOT
+ ports:
+ - 8080:8080
+ environment:
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: producer
+ producer.topic: test
+ producer.throttle-ms: 200
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>first-contact</artifactId>
- <name>First Contact: a Simple Producer and a simple Consumer-Group</name>
+ <artifactId>endless-producer</artifactId>
+ <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
<version>1.0-SNAPSHOT</version>
<dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>3.0.0</version>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ <configuration>
+ <images>
+ <image>
+ <name>juplo/%a:%v</name>
+ </image>
+ </images>
+ </configuration>
<executions>
<execution>
- <id>producer</id>
- <configuration>
- <mainClass>de.juplo.kafka.SimpleProducer</mainClass>
- </configuration>
- </execution>
- <execution>
- <id>consumer</id>
- <configuration>
- <mainClass>de.juplo.kafka.SimpleConsumer</mainClass>
- </configuration>
+ <id>build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
</execution>
</executions>
</plugin>
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.util.concurrent.Executors;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application
+{
+ @Autowired
+ ApplicationProperties properties;
+
+
+ @Bean
+ public EndlessProducer producer()
+ {
+ Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set");
+ Assert.hasText(properties.getClientId(), "producer.client-id must be set");
+ Assert.hasText(properties.getTopic(), "producer.topic must be set");
+
+ EndlessProducer producer =
+ new EndlessProducer(
+ Executors.newFixedThreadPool(1),
+ properties.getBootstrapServer(),
+ properties.getClientId(),
+ properties.getTopic(),
+ properties.getAcks(),
+ properties.getThrottleMs());
+
+ producer.start();
+
+ return producer;
+ }
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(Application.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "producer")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+ private String bootstrapServer;
+ private String clientId;
+ private String topic;
+ private String acks;
+ private int throttleMs;
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.concurrent.ExecutionException;
+
+
+@RestController
+@RequiredArgsConstructor
+public class DriverController
+{
+ private final EndlessProducer producer;
+
+
+ @PostMapping("start")
+ public void start()
+ {
+ producer.start();
+ }
+
+ @PostMapping("stop")
+ public void stop() throws ExecutionException, InterruptedException
+ {
+ producer.stop();
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import javax.annotation.PreDestroy;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+
+@Slf4j
+public class EndlessProducer implements Runnable
+{
+ private final ExecutorService executor;
+ private final String id;
+ private final String topic;
+ private final String acks;
+ private final int throttleMs;
+ private final KafkaProducer<String, String> producer;
+
+ private boolean running = false;
+ private long i = 0;
+ private long produced = 0;
+ private Future<?> future = null;
+
+ public EndlessProducer(
+ ExecutorService executor,
+ String bootstrapServer,
+ String clientId,
+ String topic,
+ String acks,
+ int throttleMs)
+ {
+ this.executor = executor;
+ this.id = clientId;
+ this.topic = topic;
+ this.acks = acks;
+ this.throttleMs = throttleMs;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServer);
+ props.put("client.id", clientId);
+ props.put("acks", acks);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ this.producer = new KafkaProducer<>(props);
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ for (; running; i++)
+ {
+ final long time = System.currentTimeMillis();
+
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ topic, // Topic
+ Long.toString(i % 10), // Key
+ Long.toString(i) // Value
+ );
+
+ producer.send(record, (metadata, e) ->
+ {
+ long now = System.currentTimeMillis();
+ if (e == null)
+ {
+ // HANDLE SUCCESS
+ produced++;
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ }
+ else
+ {
+ // HANDLE ERROR
+ log.error(
+ "{} - ERROR key={} timestamp={} latency={}ms: {}",
+ id,
+ record.key(),
+ metadata == null ? -1 : metadata.timestamp(),
+ now - time,
+ e.toString()
+ );
+ }
+ });
+
+ long now = System.currentTimeMillis();
+ log.trace(
+ "{} - Queued #{} key={} latency={}ms",
+ id,
+ i,
+ record.key(),
+ now - time
+ );
+
+ if (throttleMs > 0)
+ {
+ try
+ {
+ Thread.sleep(throttleMs);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted while throttling!", e);
+ }
+ }
+ }
+
+ log.info("{} - Done", id);
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+
+ public synchronized void start()
+ {
+ if (running)
+ throw new IllegalStateException("Producer instance " + id + " is already running!");
+
+ log.info("{} - Starting - produced {} messages before", id, produced);
+ running = true;
+ future = executor.submit(this);
+ }
+
+ public synchronized void stop() throws ExecutionException, InterruptedException
+ {
+ if (!running)
+ throw new IllegalStateException("Producer instance " + id + " is not running!");
+
+ log.info("{} - Stopping...", id);
+ running = false;
+ future.get();
+ log.info("{} - Stopped - produced {} messages so far", id, produced);
+ }
+
+ @PreDestroy
+ public void destroy() throws ExecutionException, InterruptedException
+ {
+ log.info("{} - Destroy!", id);
+ try
+ {
+ stop();
+ }
+ catch (IllegalStateException e)
+ {
+ log.info("{} - Was already stopped", id);
+ }
+ finally
+ {
+ log.info("{} - Closing the KafkaProducer", id);
+ producer.close();
+ log.info("{}: Produced {} messages in total, exiting!", id, produced);
+ }
+ }
+}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-@Slf4j
-public class SimpleConsumer
-{
- private long consumed = 0;
- private KafkaConsumer<String, String> consumer;
- private Lock lock = new ReentrantLock();
- private Condition stopped = lock.newCondition();
-
-
- public SimpleConsumer()
- {
- // tag::create[]
- Properties props = new Properties();
- props.put("bootstrap.servers", ":9092");
- props.put("group.id", "my-consumer"); // << Used for Offset-Commits
- // end::create[]
- props.put("auto.offset.reset", "earliest");
- // tag::create[]
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // end::create[]
- this.consumer = consumer;
- }
-
-
- public void run()
- {
- String id = "C";
-
- try
- {
- log.info("{} - Subscribing to topic test", id);
- consumer.subscribe(Arrays.asList("test"));
-
- // tag::loop[]
- while (true)
- {
- ConsumerRecords<String, String> records =
- consumer.poll(Duration.ofSeconds(1));
-
- // Do something with the data...
- // end::loop[]
- log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, String> record : records)
- {
- consumed++;
- log.info(
- "{} - {}: {}/{} - {}={}",
- id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
- );
- }
- // tag::loop[]
- }
- // end::loop[]
- }
- catch(WakeupException e)
- {
- log.info("{} - RIIING!", id);
- }
- catch(Exception e)
- {
- log.error("{} - Unexpected error: {}", id, e.toString());
- }
- finally
- {
- this.lock.lock();
- try
- {
- log.info("{} - Closing the KafkaConsumer", id);
- consumer.close();
- log.info("C - DONE!");
- stopped.signal();
- }
- finally
- {
- this.lock.unlock();
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
- }
- }
- }
-
-
- public static void main(String[] args) throws Exception
- {
- SimpleConsumer instance = new SimpleConsumer();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- instance.lock.lock();
- try
- {
- instance.consumer.wakeup();
- instance.stopped.await();
- }
- catch (InterruptedException e)
- {
- log.warn("Interrrupted while waiting for the consumer to stop!", e);
- }
- finally
- {
- instance.lock.unlock();
- }
- }));
-
- instance.run();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.util.Properties;
-
-
-@Slf4j
-public class SimpleProducer
-{
- public static void main(String[] args) throws Exception
- {
- // tag::create[]
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
- // end::create[]
-
- String id = "P";
- long i = 0;
-
- try
- {
- for (; i < 100 ; i++)
- {
- final long time = System.currentTimeMillis();
-
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- "test", // Topic
- Long.toString(i%10), // Key
- Long.toString(i) // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
-
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued #{} key={} latency={}ms",
- id,
- i,
- record.key(),
- now - time
- );
- }
- }
- finally
- {
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
- log.info("{}: Exiting!", id);
- }
- }
-}
--- /dev/null
+producer:
+ bootstrap-server: :9092
+ client-id: peter
+ topic: test
+ acks: 1
+ throttle-ms: 1000
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+logging:
+ level:
+ root: INFO
+ de.juplo: DEBUG