From 6bcb16a385d84eae58e01a2287a175ddbbf3b214 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 9 Nov 2024 09:15:08 +0100 Subject: [PATCH] Setup mit sehr kurzer `retention.ms` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * `retention.ms` = 10000 * `segment.ms` = 3000 * `setup` startet außerdem automatisch `akhq` und `producer` --- README.sh | 39 ---- build.gradle | 43 ----- docker/docker-compose.yml | 4 + pom.xml | 73 -------- settings.gradle | 1 - .../java/de/juplo/kafka/ExampleConsumer.java | 168 ------------------ src/main/resources/logback.xml | 16 -- 7 files changed, 4 insertions(+), 340 deletions(-) delete mode 100755 README.sh delete mode 100644 build.gradle delete mode 100644 pom.xml delete mode 100644 settings.gradle delete mode 100644 src/main/java/de/juplo/kafka/ExampleConsumer.java delete mode 100644 src/main/resources/logback.xml diff --git a/README.sh b/README.sh deleted file mode 100755 index 85b8f960..00000000 --- a/README.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/simple-consumer:1.0-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer - -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer - -docker compose -f docker/docker-compose.yml start consumer -sleep 5 - -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer diff --git a/build.gradle b/build.gradle deleted file mode 100644 index f642d020..00000000 --- a/build.gradle +++ /dev/null @@ -1,43 +0,0 @@ -plugins { - id 'java' - id 'org.springframework.boot' version '4.0.6' - id 'io.spring.dependency-management' version '1.1.7' - id 'com.gorylenko.gradle-git-properties' version '2.4.2' - id 'com.google.cloud.tools.jib' version '3.4.5' -} - -group = 'de.juplo.kafka' -version = '1.0-SNAPSHOT' - -java { - toolchain { - languageVersion = JavaLanguageVersion.of(21) - } -} - -configurations { - compileOnly { - extendsFrom annotationProcessor - } -} - -repositories { - mavenCentral() -} - -dependencies { - implementation 'org.apache.kafka:kafka-clients' - implementation 'ch.qos.logback:logback-classic' - compileOnly 'org.projectlombok:lombok' - annotationProcessor 'org.projectlombok:lombok' -} - -tasks.named('test') { - useJUnitPlatform() -} - -jib { - from { image = 'eclipse-temurin:21-jre' } - to { image = "juplo/${project.name}:${project.version}" } - container { mainClass = 'de.juplo.kafka.ExampleConsumer' } -} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 732f8290..935051fd 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -140,6 +140,8 @@ services: --partitions 2 \ --replication-factor 3 \ --config min.insync.replicas=2 \ + --config retention.ms=10000 \ + --config segment.ms=3000 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ && date > INITIALIZED @@ -147,6 +149,8 @@ services: stop_grace_period: 0s depends_on: - cli + - akhq + - producer akhq: image: tchiotludo/akhq:0.23.0 diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 6194b733..00000000 --- a/pom.xml +++ /dev/null @@ -1,73 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 4.0.6 - - - - de.juplo.kafka - simple-consumer - Simple Consumer-Group - Super Simple Consumer-Group, that is implemented as a plain Java-program - 1.0-SNAPSHOT - - - 21 - - - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - true - - - ch.qos.logback - logback-classic - - - - - - - pl.project13.maven - git-commit-id-plugin - - - com.google.cloud.tools - jib-maven-plugin - 3.4.5 - - - eclipse-temurin:21-jre - - - juplo/${project.artifactId}:${project.version} - - - de.juplo.kafka.ExampleConsumer - - - - - package - - dockerBuild - - - - - - - - diff --git a/settings.gradle b/settings.gradle deleted file mode 100644 index 71fb5737..00000000 --- a/settings.gradle +++ /dev/null @@ -1 +0,0 @@ -rootProject.name = 'simple-consumer' diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java deleted file mode 100644 index 36ddc70d..00000000 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ /dev/null @@ -1,168 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.Properties; - - -@Slf4j -public class ExampleConsumer implements ConsumerRebalanceListener -{ - private final String id; - private final String topic; - private final Consumer consumer; - - private volatile boolean running = false; - private long consumed = 0; - - public ExampleConsumer( - String broker, - String topic, - String groupId, - String clientId) - { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - this.id = clientId; - this.topic = topic; - consumer = new KafkaConsumer<>(props); - } - - - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); - running = true; - - while (true) - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - running = false; - } - } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions - .stream() - .forEach(partition -> log.info("{} - partition assigned: {}", id, partition)); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions - .stream() - .forEach(partition -> log.info("{} - partition revoked: {}", id, partition)); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - partitions - .stream() - .forEach(partition -> log.info("{} - partition lost: {}", id, partition)); - } - - - public static void main(String[] args) throws Exception - { - if (args.length != 4) - { - log.error("Four arguments required!"); - log.error("args[0]: Broker-Address"); - log.error("args[1]: Topic"); - log.error("args[2]: Group-ID"); - log.error("args[3]: Unique Client-ID"); - System.exit(1); - return; - } - - - log.info( - "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", - args[0], - args[1], - args[2], - args[3]); - - ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("{} - Waiting for main-thread...", instance.id); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("{} - Shutdown completed.", instance.id); - })); - - instance.run(); - } -} - diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index 7a25e76f..00000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} | %highlight(%-5level) %msg%n - - - - - - - - - - -- 2.39.5