Möglichst einfach gehaltener technisch vollständiger Producer
authorKai Moritz <kai@juplo.de>
Wed, 27 Jul 2022 08:36:43 +0000 (10:36 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 1 Aug 2022 20:23:23 +0000 (22:23 +0200)
* Unterschiede zu dem Producer aus `first-contact`:
** Der Producer erzeugt endlos alle ca. 500ms eine Nachricht.
** Der Producer beendet sich ordentlich, wenn STRG-C gedrückt wird.
** Der Producer wird auch als Docker-Image gebaut
* Das Compose-Setup an das Setup aus den vorhergehenden Übungen
  angegelichen.

Dockerfile [new file with mode: 0644]
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/SimpleConsumer.java [deleted file]
src/main/java/de/juplo/kafka/SimpleProducer.java

diff --git a/Dockerfile b/Dockerfile
new file mode 100644 (file)
index 0000000..ea4d335
--- /dev/null
@@ -0,0 +1,6 @@
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+COPY target/dependency /opt/libs
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar", "kafka:9092", "test" ]
+CMD [ "DCKR" ]
index 0ee50a9..3f46c04 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,5 +1,7 @@
 #!/bin/bash
 
+IMAGE=juplo/simple-producer:1.0-SNAPSHOT
+
 if [ "$1" = "cleanup" ]
 then
   docker-compose down -v
@@ -7,27 +9,27 @@ then
   exit
 fi
 
-mvn package || exit 1
-if [ "$1" = "build" ]; then exit; fi
-
-trap 'kill $(jobs -p) 2>/dev/null' EXIT
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
 
-docker-compose up -d
+if [[
+  $(docker image ls -q $IMAGE) == "" ||
+  "$1" = "build"
+]]
+then
+  mvn install || exit
+else
+  echo "Using image existing images:"
+  docker image ls $IMAGE
+fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
+docker-compose up setup
+docker-compose up -d
+sleep 5
 
-echo "Producing messages"
-mvn exec:java@producer
-
-echo "Reading messages"
-mvn exec:java@consumer &
-sleep 7
-kill $(jobs -p)
-sleep 2
+docker-compose exec cli kafkacat -b kafka:9092 -t test -q -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
 
-echo "Re-Reading messages"
-mvn exec:java@consumer &
-sleep 7
-kill $(jobs -p)
-sleep 2
+docker-compose stop producer
+docker-compose exec cli kafkacat -b kafka:9092 -t test -q -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
+docker-compose logs producer
index ec307f5..d9f15c8 100644 (file)
@@ -7,20 +7,56 @@ services:
     ports:
       - 2181:2181
 
-  kafka:
+  kafka-1:
     image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9081:9081
+    depends_on:
+      - zookeeper
+
+  kafka-2:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 2
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     ports:
       - 9092:9082
       - 9082:9082
+    networks:
+      default:
+        aliases:
+          - kafka
+    depends_on:
+      - zookeeper
+
+  kafka-3:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 3
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9083:9083
     depends_on:
       - zookeeper
 
@@ -29,9 +65,14 @@ services:
     command: >
       bash -c "
         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic test
       "
 
   cli:
     image: juplo/toolbox
     command: sleep infinity
+
+  producer:
+    image: juplo/simple-producer:1.0-SNAPSHOT
+    command: producer
diff --git a/pom.xml b/pom.xml
index 70f37e8..4c52f37 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,8 +12,8 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>first-contact</artifactId>
-  <name>First Contact: a Simple Producer and a simple Consumer-Group</name>
+  <artifactId>simple-producer</artifactId>
+  <name>Super Simple Producer</name>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
   <build>
     <plugins>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>exec-maven-plugin</artifactId>
-        <version>3.0.0</version>
+        <groupId>pl.project13.maven</groupId>
+        <artifactId>git-commit-id-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
         <executions>
           <execution>
-            <id>producer</id>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
             <configuration>
-              <mainClass>de.juplo.kafka.SimpleProducer</mainClass>
+              <outputDirectory>${project.build.directory}/libs</outputDirectory>
             </configuration>
           </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <addClasspath>true</addClasspath>
+              <classpathPrefix>libs/</classpathPrefix>
+              <mainClass>de.juplo.kafka.SimpleProducer</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>io.fabric8</groupId>
+        <artifactId>docker-maven-plugin</artifactId>
+        <version>0.33.0</version>
+        <configuration>
+          <images>
+            <image>
+              <name>juplo/%a:%v</name>
+            </image>
+          </images>
+        </configuration>
+        <executions>
           <execution>
-            <id>consumer</id>
-            <configuration>
-              <mainClass>de.juplo.kafka.SimpleConsumer</mainClass>
-            </configuration>
+             <id>build</id>
+             <phase>package</phase>
+             <goals>
+               <goal>build</goal>
+             </goals>
           </execution>
         </executions>
       </plugin>
     </plugins>
   </build>
 
+
 </project>
diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java
deleted file mode 100644 (file)
index e4d9697..0000000
+++ /dev/null
@@ -1,131 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-@Slf4j
-public class SimpleConsumer
-{
-  private long consumed = 0;
-  private KafkaConsumer<String, String> consumer;
-  private Lock lock = new ReentrantLock();
-  private Condition stopped = lock.newCondition();
-
-
-  public SimpleConsumer()
-  {
-    // tag::create[]
-    Properties props = new Properties();
-    props.put("bootstrap.servers", ":9092");
-    props.put("group.id", "my-consumer"); // << Used for Offset-Commits
-    // end::create[]
-    props.put("auto.offset.reset", "earliest");
-    // tag::create[]
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", StringDeserializer.class.getName());
-
-    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-    // end::create[]
-    this.consumer = consumer;
-  }
-
-
-  public void run()
-  {
-    String id = "C";
-
-    try
-    {
-      log.info("{} - Subscribing to topic test", id);
-      consumer.subscribe(Arrays.asList("test"));
-
-      // tag::loop[]
-      while (true)
-      {
-        ConsumerRecords<String, String> records =
-            consumer.poll(Duration.ofSeconds(1));
-
-        // Do something with the data...
-        // end::loop[]
-        log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<String, String> record : records)
-        {
-          consumed++;
-          log.info(
-              "{} - {}: {}/{} - {}={}",
-              id,
-              record.offset(),
-              record.topic(),
-              record.partition(),
-              record.key(),
-              record.value()
-          );
-        }
-        // tag::loop[]
-      }
-      // end::loop[]
-    }
-    catch(WakeupException e)
-    {
-      log.info("{} - RIIING!", id);
-    }
-    catch(Exception e)
-    {
-      log.error("{} - Unexpected error: {}", id, e.toString());
-    }
-    finally
-    {
-      this.lock.lock();
-      try
-      {
-        log.info("{} - Closing the KafkaConsumer", id);
-        consumer.close();
-        log.info("C - DONE!");
-        stopped.signal();
-      }
-      finally
-      {
-        this.lock.unlock();
-        log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-      }
-    }
-  }
-
-
-  public static void main(String[] args) throws Exception
-  {
-    SimpleConsumer instance = new SimpleConsumer();
-
-    Runtime.getRuntime().addShutdownHook(new Thread(() ->
-    {
-      instance.lock.lock();
-      try
-      {
-        instance.consumer.wakeup();
-        instance.stopped.await();
-      }
-      catch (InterruptedException e)
-      {
-        log.warn("Interrrupted while waiting for the consumer to stop!", e);
-      }
-      finally
-      {
-        instance.lock.unlock();
-      }
-    }));
-
-    instance.run();
-  }
-}
index 43a7227..fd03d73 100644 (file)
@@ -6,8 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 
 @Slf4j
@@ -18,21 +16,21 @@ public class SimpleProducer
   private final KafkaProducer<String, String> producer;
 
   private long produced = 0;
+  private volatile boolean running = true;
+  private volatile boolean done = false;
 
-  public SimpleProducer(String clientId, String topic)
+  public SimpleProducer(String broker, String topic, String id)
   {
-    // tag::create[]
     Properties props = new Properties();
-    props.put("bootstrap.servers", "localhost:9092");
+    props.put("bootstrap.servers", broker);
+    props.put("client.id", id); // Nur zur Wiedererkennung
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", StringSerializer.class.getName());
 
-    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-    // end::create[]
+    producer = new KafkaProducer<>(props);
 
-    this.id = clientId;
     this.topic = topic;
-    this.producer = producer;
+    this.id = id;
   }
 
   public void run()
@@ -41,18 +39,19 @@ public class SimpleProducer
 
     try
     {
-      for (; i < 100 ; i++)
+      for (; running ; i++)
       {
         send(Long.toString(i%10), Long.toString(i));
+        Thread.sleep(500);
       }
-
-      log.info("{} - Done", id);
     }
+    catch (InterruptedException e) {}
     finally
     {
       log.info("{}: Closing the KafkaProducer", id);
       producer.close();
       log.info("{}: Produced {} messages in total, exiting!", id, produced);
+      done = true;
     }
   }
 
@@ -111,7 +110,42 @@ public class SimpleProducer
 
   public static void main(String[] args) throws Exception
   {
-    SimpleProducer producer = new SimpleProducer("P", "test");
+    String broker = ":9092";
+    String topic = "test";
+    String clientId = "DEV";
+
+    switch (args.length)
+    {
+      case 3:
+        clientId = args[2];
+      case 2:
+        topic = args[1];
+      case 1:
+        broker = args[0];
+    }
+
+    SimpleProducer producer = new SimpleProducer(broker, topic, clientId);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() ->
+    {
+      producer.running = false;
+      while (!producer.done)
+      {
+        log.info("Waiting for producer...");
+        try
+        {
+          Thread.sleep(1000);
+        }
+        catch (InterruptedException e) {}
+      }
+      log.info("Shutdown completed.");
+    }));
+
+    log.info(
+        "Running simple producer: broker={}, topic={}, client-id={}",
+        broker,
+        topic,
+        clientId);
     producer.run();
   }
 }