--- /dev/null
+*
+!target/*.jar
--- /dev/null
+target/*.jar
--- /dev/null
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
#!/bin/bash
+IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+
if [ "$1" = "cleanup" ]
then
docker-compose down -v
exit
fi
-mvn package || exit 1
-if [ "$1" = "build" ]; then exit; fi
-
-trap 'kill $(jobs -p) 2>/dev/null' EXIT
+docker-compose up -d zookeeper kafka cli
-docker-compose up -d
+if [[
+ $(docker image ls -q $IMAGE) == "" ||
+ "$1" = "build"
+]]
+then
+ mvn install || exit
+else
+ echo "Using image existing images:"
+ docker image ls $IMAGE
+fi
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-
-echo "Producing messages"
-mvn exec:java@producer
-
-echo "Reading messages"
-mvn exec:java@consumer &
-sleep 7
-kill $(jobs -p)
-sleep 2
-
-echo "Re-Reading messages"
-mvn exec:java@consumer &
-sleep 7
-kill $(jobs -p)
-sleep 2
+docker-compose up setup
+docker-compose up -d producer consumer
+sleep 15
+docker-compose stop producer consumer
+docker-compose logs consumer
cli:
image: juplo/toolbox
command: sleep infinity
+
+ producer:
+ image: juplo/endless-producer:1.0-SNAPSHOT
+ ports:
+ - 8080:8080
+ environment:
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: producer
+ producer.topic: test
+ producer.throttle-ms: 200
+
+
+ consumer:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
+ ports:
+ - 8081:8081
+ environment:
+ consumer.bootstrap-server: kafka:9092
+ consumer.client-id: my-group
+ consumer.client-id: consumer
+ consumer.topic: test
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>first-contact</artifactId>
- <name>First Contact: a Simple Producer and a simple Consumer-Group</name>
+ <artifactId>endless-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
+ <name>Endless Consumer: a Simple Consumer-Group that reads and print the topic</name>
<dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>3.0.0</version>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ <configuration>
+ <images>
+ <image>
+ <name>juplo/%a:%v</name>
+ </image>
+ </images>
+ </configuration>
<executions>
<execution>
- <id>producer</id>
- <configuration>
- <mainClass>de.juplo.kafka.SimpleProducer</mainClass>
- </configuration>
- </execution>
- <execution>
- <id>consumer</id>
- <configuration>
- <mainClass>de.juplo.kafka.SimpleConsumer</mainClass>
- </configuration>
+ <id>build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
</execution>
</executions>
</plugin>
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.util.concurrent.Executors;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application
+{
+ @Autowired
+ ApplicationProperties properties;
+
+
+ @Bean
+ public EndlessConsumer consumer()
+ {
+ Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set");
+ Assert.hasText(properties.getGroupId(), "consumer.group-id must be set");
+ Assert.hasText(properties.getClientId(), "consumer.client-id must be set");
+ Assert.hasText(properties.getTopic(), "consumer.topic must be set");
+
+ EndlessConsumer consumer =
+ new EndlessConsumer(
+ Executors.newFixedThreadPool(1),
+ properties.getBootstrapServer(),
+ properties.getGroupId(),
+ properties.getClientId(),
+ properties.getTopic());
+
+ consumer.start();
+
+ return consumer;
+ }
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(Application.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties(prefix = "consumer")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+ private String bootstrapServer;
+ private String groupId;
+ private String clientId;
+ private String topic;
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.concurrent.ExecutionException;
+
+
+@RestController
+@RequiredArgsConstructor
+public class DriverController
+{
+ private final EndlessConsumer consumer;
+
+
+ @PostMapping("start")
+ public void start()
+ {
+ consumer.start();
+ }
+
+ @PostMapping("stop")
+ public void stop() throws ExecutionException, InterruptedException
+ {
+ consumer.stop();
+ }
+}
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
+import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
-public class SimpleConsumer
+public class EndlessConsumer implements Runnable
{
- private long consumed = 0;
- private KafkaConsumer<String, String> consumer;
- private Lock lock = new ReentrantLock();
- private Condition stopped = lock.newCondition();
+ private final ExecutorService executor;
+ private final String bootstrapServer;
+ private final String groupId;
+ private final String id;
+ private final String topic;
+ private AtomicBoolean running = new AtomicBoolean();
+ private long consumed = 0;
+ private KafkaConsumer<String, String> consumer = null;
+ private Future<?> future = null;
+
+ public EndlessConsumer(
+ ExecutorService executor,
+ String bootstrapServer,
+ String groupId,
+ String clientId,
+ String topic)
+ {
+ this.executor = executor;
+ this.bootstrapServer = bootstrapServer;
+ this.groupId = groupId;
+ this.id = clientId;
+ this.topic = topic;
+ }
- public SimpleConsumer()
+ @Override
+ public void run()
{
- // tag::create[]
Properties props = new Properties();
- props.put("bootstrap.servers", ":9092");
- props.put("group.id", "my-consumer"); // << Used for Offset-Commits
- // end::create[]
+ props.put("bootstrap.servers", bootstrapServer);
+ props.put("group.id", groupId);
+ props.put("client.id", id);
props.put("auto.offset.reset", "earliest");
- // tag::create[]
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // end::create[]
- this.consumer = consumer;
- }
-
-
- public void run()
- {
- String id = "C";
+ this.consumer = new KafkaConsumer<>(props);
try
{
- log.info("{} - Subscribing to topic test", id);
- consumer.subscribe(Arrays.asList("test"));
+ log.info("{} - Subscribing to topic {}", id, topic);
+ consumer.subscribe(Arrays.asList(topic));
- // tag::loop[]
while (true)
{
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
// Do something with the data...
- // end::loop[]
log.info("{} - Received {} messages", id, records.count());
for (ConsumerRecord<String, String> record : records)
{
record.value()
);
}
- // tag::loop[]
}
- // end::loop[]
}
catch(WakeupException e)
{
catch(Exception e)
{
log.error("{} - Unexpected error: {}", id, e.toString());
+ running.set(false); // Mark the instance as not running
}
finally
{
- this.lock.lock();
- try
- {
- log.info("{} - Closing the KafkaConsumer", id);
- consumer.close();
- log.info("C - DONE!");
- stopped.signal();
- }
- finally
- {
- this.lock.unlock();
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
- }
+ log.info("{} - Closing the KafkaConsumer", id);
+ consumer.close();
+ log.info("{} - Consumer-Thread exiting", id);
}
}
- public static void main(String[] args) throws Exception
+ public synchronized void start()
{
- SimpleConsumer instance = new SimpleConsumer();
+ boolean stateChanged = running.compareAndSet(false, true);
+ if (!stateChanged)
+ throw new RuntimeException("Consumer instance " + id + " is already running!");
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- instance.lock.lock();
- try
- {
- instance.consumer.wakeup();
- instance.stopped.await();
- }
- catch (InterruptedException e)
- {
- log.warn("Interrrupted while waiting for the consumer to stop!", e);
- }
- finally
- {
- instance.lock.unlock();
- }
- }));
+ log.info("{} - Starting - consumed {} messages before", id, consumed);
+ future = executor.submit(this);
+ }
- instance.run();
+ public synchronized void stop() throws ExecutionException, InterruptedException
+ {
+ boolean stateChanged = running.compareAndSet(true, false);
+ if (!stateChanged)
+ throw new RuntimeException("Consumer instance " + id + " is not running!");
+
+ log.info("{} - Stopping", id);
+ consumer.wakeup();
+ future.get();
+ log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+ }
+
+ @PreDestroy
+ public void destroy() throws ExecutionException, InterruptedException
+ {
+ log.info("{} - Destroy!", id);
+ try
+ {
+ stop();
+ }
+ catch (IllegalStateException e)
+ {
+ log.info("{} - Was already stopped", id);
+ }
+ finally
+ {
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ }
}
}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-
-@Slf4j
-public class SimpleProducer
-{
- private final String id;
- private final String topic;
- private final KafkaProducer<String, String> producer;
-
- private long produced = 0;
-
- public SimpleProducer(String clientId, String topic)
- {
- // tag::create[]
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
- // end::create[]
-
- this.id = clientId;
- this.topic = topic;
- this.producer = producer;
- }
-
- public void run()
- {
- long i = 0;
-
- try
- {
- for (; i < 100 ; i++)
- {
- send(Long.toString(i%10), Long.toString(i));
- }
-
- log.info("{} - Done", id);
- }
- finally
- {
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
- log.info("{}: Produced {} messages in total, exiting!", id, produced);
- }
- }
-
- void send(String key, String value)
- {
- final long time = System.currentTimeMillis();
-
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- produced++;
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
-
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued #{} key={} latency={}ms",
- id,
- value,
- record.key(),
- now - time
- );
- }
-
-
- public static void main(String[] args) throws Exception
- {
- SimpleProducer producer = new SimpleProducer("P", "test");
- producer.run();
- }
-}
--- /dev/null
+consumer:
+ bootstrap-server: :9092
+ group-id: my-consumer
+ client-id: peter
+ topic: test
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+logging:
+ level:
+ root: INFO
+ de.juplo: DEBUG
+server:
+ port: 8081