]> juplo.de Git - demos/kafka/training/commitdiff
Setup mit sehr kurzer `retention.ms` consumer/simple-consumer--retention-ms--vorlage consumer/simple-consumer--retention-ms--vorlage--2026-06-lvm--rebase-vollständig
authorKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 08:15:08 +0000 (09:15 +0100)
committerKai Moritz <kai.milan.moritz@googlemail.com>
Fri, 12 Jun 2026 18:43:38 +0000 (18:43 +0000)
* `retention.ms` = 10000
* `segment.ms` = 3000
* `setup` startet außerdem automatisch `akhq` und `producer`

README.sh [deleted file]
build.gradle [deleted file]
docker/docker-compose.yml
pom.xml [deleted file]
settings.gradle [deleted file]
src/main/java/de/juplo/kafka/ExampleConsumer.java [deleted file]
src/main/resources/logback.xml [deleted file]

diff --git a/README.sh b/README.sh
deleted file mode 100755 (executable)
index 85b8f96..0000000
--- a/README.sh
+++ /dev/null
@@ -1,39 +0,0 @@
-#!/bin/bash
-
-IMAGE=juplo/simple-consumer:1.0-SNAPSHOT
-
-if [ "$1" = "cleanup" ]
-then
-  docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans
-  mvn clean
-  exit
-fi
-
-docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
-
-if [[
-  $(docker image ls -q $IMAGE) == "" ||
-  "$1" = "build"
-]]
-then
-  mvn clean install || exit
-else
-  echo "Using image existing images:"
-  docker image ls $IMAGE
-fi
-
-docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
-
-
-docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
-
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
-
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
diff --git a/build.gradle b/build.gradle
deleted file mode 100644 (file)
index f642d02..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-plugins {
-       id 'java'
-       id 'org.springframework.boot' version '4.0.6'
-       id 'io.spring.dependency-management' version '1.1.7'
-       id 'com.gorylenko.gradle-git-properties' version '2.4.2'
-       id 'com.google.cloud.tools.jib' version '3.4.5'
-}
-
-group = 'de.juplo.kafka'
-version = '1.0-SNAPSHOT'
-
-java {
-       toolchain {
-               languageVersion = JavaLanguageVersion.of(21)
-       }
-}
-
-configurations {
-       compileOnly {
-               extendsFrom annotationProcessor
-       }
-}
-
-repositories {
-       mavenCentral()
-}
-
-dependencies {
-       implementation 'org.apache.kafka:kafka-clients'
-       implementation 'ch.qos.logback:logback-classic'
-       compileOnly 'org.projectlombok:lombok'
-       annotationProcessor 'org.projectlombok:lombok'
-}
-
-tasks.named('test') {
-       useJUnitPlatform()
-}
-
-jib {
-       from { image = 'eclipse-temurin:21-jre' }
-       to { image = "juplo/${project.name}:${project.version}" }
-       container { mainClass = 'de.juplo.kafka.ExampleConsumer' }
-}
index 732f8290d0027622f00c290259c5b7969f1f1104..935051fd1e9ee75a75c74f189d0a47116a9f5467 100644 (file)
@@ -140,6 +140,8 @@ services:
                        --partitions 2 \
                        --replication-factor 3 \
                        --config min.insync.replicas=2 \
+                       --config retention.ms=10000 \
+                       --config segment.ms=3000 \
           && echo Das Topic \'test\' wurde erfolgreich angelegt: \
           && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
           && date > INITIALIZED
@@ -147,6 +149,8 @@ services:
     stop_grace_period: 0s
     depends_on:
       - cli
+      - akhq
+      - producer
 
   akhq:
     image: tchiotludo/akhq:0.23.0
diff --git a/pom.xml b/pom.xml
deleted file mode 100644 (file)
index 6194b73..0000000
--- a/pom.xml
+++ /dev/null
@@ -1,73 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.springframework.boot</groupId>
-    <artifactId>spring-boot-starter-parent</artifactId>
-    <version>4.0.6</version>
-    <relativePath/> <!-- lookup parent from repository -->
-  </parent>
-
-  <groupId>de.juplo.kafka</groupId>
-  <artifactId>simple-consumer</artifactId>
-  <name>Simple Consumer-Group</name>
-  <description>Super Simple Consumer-Group, that is implemented as a plain Java-program</description>
-  <version>1.0-SNAPSHOT</version>
-
-  <properties>
-    <java.version>21</java.version>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.projectlombok</groupId>
-      <artifactId>lombok</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>ch.qos.logback</groupId>
-      <artifactId>logback-classic</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>pl.project13.maven</groupId>
-        <artifactId>git-commit-id-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>com.google.cloud.tools</groupId>
-        <artifactId>jib-maven-plugin</artifactId>
-        <version>3.4.5</version>
-        <configuration>
-          <from>
-            <image>eclipse-temurin:21-jre</image>
-          </from>
-          <to>
-            <image>juplo/${project.artifactId}:${project.version}</image>
-          </to>
-          <container>
-            <mainClass>de.juplo.kafka.ExampleConsumer</mainClass>
-          </container>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>dockerBuild</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git a/settings.gradle b/settings.gradle
deleted file mode 100644 (file)
index 71fb573..0000000
+++ /dev/null
@@ -1 +0,0 @@
-rootProject.name = 'simple-consumer'
diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
deleted file mode 100644 (file)
index 36ddc70..0000000
+++ /dev/null
@@ -1,168 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Properties;
-
-
-@Slf4j
-public class ExampleConsumer implements ConsumerRebalanceListener
-{
-  private final String id;
-  private final String topic;
-  private final Consumer<String, String> consumer;
-
-  private volatile boolean running = false;
-  private long consumed = 0;
-
-  public ExampleConsumer(
-    String broker,
-    String topic,
-    String groupId,
-    String clientId)
-  {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", broker);
-    props.put("group.id", groupId); // ID für die Offset-Commits
-    props.put("client.id", clientId); // Nur zur Wiedererkennung
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", StringDeserializer.class.getName());
-
-    this.id = clientId;
-    this.topic = topic;
-    consumer = new KafkaConsumer<>(props);
-  }
-
-
-  public void run()
-  {
-    try
-    {
-      log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), this);
-      running = true;
-
-      while (true)
-      {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-
-        log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<String, String> record : records)
-        {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
-        }
-      }
-    }
-    catch(WakeupException e)
-    {
-      log.info("{} - Consumer was signaled to finish its work", id);
-    }
-    catch(Exception e)
-    {
-      log.error("{} - Unexpected error, unsubscribing!", id, e);
-      consumer.unsubscribe();
-    }
-    finally
-    {
-      log.info("{} - Closing the KafkaConsumer", id);
-      consumer.close();
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-      running = false;
-    }
-  }
-
-  private void handleRecord(
-    String topic,
-    Integer partition,
-    Long offset,
-    String key,
-    String value)
-  {
-    consumed++;
-    log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions
-      .stream()
-      .forEach(partition -> log.info("{} - partition assigned: {}", id, partition));
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions
-      .stream()
-      .forEach(partition -> log.info("{} - partition revoked: {}", id, partition));
-  }
-
-  @Override
-  public void onPartitionsLost(Collection<TopicPartition> partitions)
-  {
-    partitions
-      .stream()
-      .forEach(partition -> log.info("{} - partition lost: {}", id, partition));
-  }
-
-
-  public static void main(String[] args) throws Exception
-  {
-    if (args.length != 4)
-    {
-      log.error("Four arguments required!");
-      log.error("args[0]: Broker-Address");
-      log.error("args[1]: Topic");
-      log.error("args[2]: Group-ID");
-      log.error("args[3]: Unique Client-ID");
-      System.exit(1);
-      return;
-    }
-
-
-    log.info(
-      "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}",
-      args[0],
-      args[1],
-      args[2],
-      args[3]);
-
-    ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]);
-
-    Runtime.getRuntime().addShutdownHook(new Thread(() ->
-    {
-      instance.consumer.wakeup();
-
-      while (instance.running)
-      {
-        log.info("{} - Waiting for main-thread...", instance.id);
-        try
-        {
-          Thread.sleep(1000);
-        }
-        catch (InterruptedException e) {}
-      }
-      log.info("{} - Shutdown completed.", instance.id);
-    }));
-
-    instance.run();
-  }
-}
-
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
deleted file mode 100644 (file)
index 7a25e76..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-
-  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
-      <Pattern>%d{HH:mm:ss.SSS} | %highlight(%-5level) %msg%n</Pattern>
-    </encoder>
-  </appender>
-
-  <logger name="de.juplo" level="TRACE"/>
-
-  <root level="INFO">
-    <appender-ref ref="STDOUT" />
-  </root>
-
-</configuration>