--- /dev/null
+*.iml
+.idea
+target
--- /dev/null
+#!/bin/bash
+
+if [ "$1" = "cleanup" ]
+then
+ docker-compose down -v
+ mvn clean
+ exit
+fi
+
+mvn package || exit 1
+if [ "$1" = "build" ]; then exit; fi
+
+trap 'kill $(jobs -p) 2>/dev/null' EXIT
+
+docker-compose up -d
+
+echo "Waiting for the Kafka-Cluster to become ready..."
+docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
+
+echo "Producing messages"
+mvn exec:java@producer
+
+echo "Reading messages"
+mvn exec:java@consumer &
+sleep 7
+kill $(jobs -p)
+sleep 2
+
+echo "Re-Reading messages"
+mvn exec:java@consumer &
+sleep 7
+kill $(jobs -p)
+sleep 2
--- /dev/null
+version: '3.2'
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:6.2.0
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ports:
+ - 2181:2181
+
+ kafka:
+ image: confluentinc/cp-kafka:6.2.0
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+ KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ ports:
+ - 9092:9082
+ - 9082:9082
+ depends_on:
+ - zookeeper
+
+ setup:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+ "
+
+ cli:
+ image: juplo/toolbox
+ command: sleep infinity
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.6.0</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+
+ <groupId>de.juplo.kafka</groupId>
+ <artifactId>first-contact</artifactId>
+ <name>First Contact: a Simple Producer and a simple Consumer-Group</name>
+ <version>1.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>producer</id>
+ <configuration>
+ <mainClass>de.juplo.kafka.SimpleProducer</mainClass>
+ </configuration>
+ </execution>
+ <execution>
+ <id>consumer</id>
+ <configuration>
+ <mainClass>de.juplo.kafka.SimpleConsumer</mainClass>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+@Slf4j
+public class SimpleConsumer
+{
+ private long consumed = 0;
+ private KafkaConsumer<String, String> consumer;
+ private Lock lock = new ReentrantLock();
+ private Condition stopped = lock.newCondition();
+
+
+ public SimpleConsumer()
+ {
+ // tag::create[]
+ Properties props = new Properties();
+ props.put("bootstrap.servers", ":9092");
+ props.put("group.id", "my-consumer"); // << Used for Offset-Commits
+ // end::create[]
+ props.put("auto.offset.reset", "earliest");
+ // tag::create[]
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ // end::create[]
+ this.consumer = consumer;
+ }
+
+
+ public void run()
+ {
+ String id = "C";
+
+ try
+ {
+ log.info("{} - Subscribing to topic test", id);
+ consumer.subscribe(Arrays.asList("test"));
+
+ // tag::loop[]
+ while (true)
+ {
+ ConsumerRecords<String, String> records =
+ consumer.poll(Duration.ofSeconds(1));
+
+ // Do something with the data...
+ // end::loop[]
+ log.info("{} - Received {} messages", id, records.count());
+ for (ConsumerRecord<String, String> record : records)
+ {
+ consumed++;
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
+ }
+ // tag::loop[]
+ }
+ // end::loop[]
+ }
+ catch(WakeupException e)
+ {
+ log.info("{} - RIIING!", id);
+ }
+ catch(Exception e)
+ {
+ log.error("{} - Unexpected error: {}", id, e.toString());
+ }
+ finally
+ {
+ this.lock.lock();
+ try
+ {
+ log.info("{} - Closing the KafkaConsumer", id);
+ consumer.close();
+ log.info("C - DONE!");
+ stopped.signal();
+ }
+ finally
+ {
+ this.lock.unlock();
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ }
+ }
+ }
+
+
+ public static void main(String[] args) throws Exception
+ {
+ SimpleConsumer instance = new SimpleConsumer();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() ->
+ {
+ instance.lock.lock();
+ try
+ {
+ instance.consumer.wakeup();
+ instance.stopped.await();
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Interrrupted while waiting for the consumer to stop!", e);
+ }
+ finally
+ {
+ instance.lock.unlock();
+ }
+ }));
+
+ instance.run();
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+
+
+@Slf4j
+public class SimpleProducer
+{
+ public static void main(String[] args) throws Exception
+ {
+ // tag::create[]
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+ // end::create[]
+
+ String id = "P";
+ long i = 0;
+
+ try
+ {
+ for (; i < 100 ; i++)
+ {
+ final long time = System.currentTimeMillis();
+
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ "test", // Topic
+ Long.toString(i%10), // Key
+ Long.toString(i) // Value
+ );
+
+ producer.send(record, (metadata, e) ->
+ {
+ long now = System.currentTimeMillis();
+ if (e == null)
+ {
+ // HANDLE SUCCESS
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ }
+ else
+ {
+ // HANDLE ERROR
+ log.error(
+ "{} - ERROR key={} timestamp={} latency={}ms: {}",
+ id,
+ record.key(),
+ metadata == null ? -1 : metadata.timestamp(),
+ now - time,
+ e.toString()
+ );
+ }
+ });
+
+ long now = System.currentTimeMillis();
+ log.trace(
+ "{} - Queued #{} key={} latency={}ms",
+ id,
+ i,
+ record.key(),
+ now - time
+ );
+ }
+ }
+ finally
+ {
+ log.info("{}: Closing the KafkaProducer", id);
+ producer.close();
+ log.info("{}: Exiting!", id);
+ }
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <Pattern>%highlight(%-5level) %m%n</Pattern>
+ </encoder>
+ </appender>
+
+ <logger name="de.juplo" level="TRACE"/>
+ <!-- logger name="org.apache.kafka.clients" level="DEBUG" / -->
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>