+++ /dev/null
-*
-!Dockerfile
-!target/*.jar
+++ /dev/null
-target/*.jar
+++ /dev/null
-FROM eclipse-temurin:21-jre
-VOLUME /tmp
-COPY target/*.jar /opt/app.jar
-ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
-CMD []
+++ /dev/null
-#!/bin/bash
-
-IMAGE=juplo/spring-consumer:1.1-json-SNAPSHOT
-
-if [ "$1" = "cleanup" ]
-then
- docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans
- mvn clean
- exit
-fi
-
-docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
-
-if [[
- $(docker image ls -q $IMAGE) == "" ||
- "$1" = "build"
-]]
-then
- mvn clean install || exit
-else
- echo "Using image existing images:"
- docker image ls $IMAGE
-fi
-
-docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
-
-
-docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
-
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
-
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
-import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
-
plugins {
id 'java'
- id 'org.springframework.boot' version '3.4.1'
id 'io.spring.dependency-management' version '1.1.7'
- id 'com.bmuschko.docker-remote-api' version '9.3.3'
}
-group = 'de.juplo.kafka'
-version = '1.1-json-SNAPSHOT'
+group = 'de.juplo.messages'
+version = '1.0-SNAPSHOT'
java {
toolchain {
mavenCentral()
}
+dependencyManagement {
+ imports {
+ mavenBom "org.springframework.boot:spring-boot-dependencies:3.1.4"
+ }
+}
+
dependencies {
- implementation 'org.springframework.kafka:spring-kafka'
- implementation 'org.springframework.boot:spring-boot-starter-actuator'
- implementation 'org.springframework.boot:spring-boot-starter-validation'
- implementation 'org.springframework.boot:spring-boot-starter-web'
+ implementation 'com.fasterxml.jackson.core:jackson-databind'
+ implementation 'com.fasterxml.jackson.core:jackson-annotations'
compileOnly 'org.projectlombok:lombok'
- developmentOnly 'org.springframework.boot:spring-boot-devtools'
- annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
- testImplementation 'org.springframework.kafka:spring-kafka-test'
- testCompileOnly 'org.projectlombok:lombok'
- testAnnotationProcessor 'org.projectlombok:lombok'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
useJUnitPlatform()
}
-docker {
- // Optional: Konfiguriere den Docker-Host, falls nötig
- // url = 'unix:///var/run/docker.sock' (Standard)
-}
-
-
-def targetDir = file("${projectDir}/target")
-def appJarName = "${project.name}-${project.version}.jar"
-
-// Task zum Bereinigen des `target`-Verzeichnisses bei `clean`
-clean {
- delete targetDir
-}
-
-// Task zum Kopieren des Haupt-JARs
-task copyJar(type: Copy) {
- from "$buildDir/libs/${appJarName}"
- into targetDir
- dependsOn build
-
- doFirst {
- def libs = file("${targetDir}/libs")
- mkdir(libs)
- }
-}
-
-// Docker-Task
-task buildDockerImage(type: DockerBuildImage) {
- inputDir = file('.')
- images = ["juplo/${project.name}:${project.version}"]
- dependsOn copyJar
-}
+++ /dev/null
-services:
- zookeeper:
- image: confluentinc/cp-zookeeper:7.8.0
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- ports:
- - 2181:2181
- volumes:
- - zookeeper-data:/var/lib/zookeeper/data
- - zookeeper-log:/var/lib/zookeeper/log
-
- kafka-1:
- image: confluentinc/cp-kafka:7.8.0
- environment:
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9081
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081
- KAFKA_BROKER_ID: 1
- KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
- KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000
- volumes:
- - kafka-1-data:/var/lib/kafka/data
- ports:
- - 9081:9081
- stop_grace_period: 120s
- depends_on:
- - zookeeper
-
- kafka-2:
- image: confluentinc/cp-kafka:7.8.0
- environment:
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9082
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082
- KAFKA_BROKER_ID: 2
- KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
- KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
- volumes:
- - kafka-2-data:/var/lib/kafka/data
- ports:
- - 9092:9082
- - 9082:9082
- networks:
- default:
- aliases:
- - kafka
- stop_grace_period: 120s
- depends_on:
- - zookeeper
-
- kafka-3:
- image: confluentinc/cp-kafka:7.8.0
- environment:
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_LISTENERS: BROKER://:9092, LOCALHOST://:9083
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083
- KAFKA_BROKER_ID: 3
- KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
- KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
- volumes:
- - kafka-3-data:/var/lib/kafka/data
- ports:
- - 9083:9083
- stop_grace_period: 120s
- depends_on:
- - zookeeper
-
- cli:
- image: juplo/toolbox
- command: sleep infinity
- stop_grace_period: 0s
- depends_on:
- - kafka-1
- - kafka-2
- - kafka-3
-
- setup:
- image: juplo/toolbox
- command:
- - bash
- - -c
- - |
- cub kafka-ready -b kafka-1:9092,kafka-2:9092,kafka-3:9092 3 60 > /dev/null 2>&1 || exit 1
- if [ -e INITIALIZED ]
- then
- echo -n Bereits konfiguriert:
- cat INITIALIZED
- kafka-topics --bootstrap-server kafka:9092 --describe --topic test
- else
- kafka-topics --bootstrap-server kafka:9092 \
- --delete \
- --if-exists \
- --topic test
- kafka-topics --bootstrap-server kafka:9092 \
- --create \
- --topic test \
- --partitions 2 \
- --replication-factor 3 \
- --config min.insync.replicas=2 \
- && echo Das Topic \'test\' wurde erfolgreich angelegt: \
- && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
- && date > INITIALIZED
- fi
- stop_grace_period: 0s
- depends_on:
- - cli
-
- akhq:
- image: tchiotludo/akhq:0.23.0
- ports:
- - 8888:8080
- environment:
- AKHQ_CONFIGURATION: |
- akhq:
- connections:
- docker-kafka-server:
- properties:
- bootstrap.servers: "kafka:9092"
- schema-registry:
- url: "http://schema-registry:8085"
- connect:
- - name: "connect"
- url: "http://connect:8083"
- depends_on:
- - kafka-1
- - kafka-2
- - kafka-3
-
- producer:
- image: juplo/spring-producer:1.0-json-SNAPSHOT
- environment:
- juplo.bootstrap-server: kafka:9092
- juplo.client-id: producer
- juplo.producer.topic: test
- juplo.producer.linger-ms: 666
- juplo.producer.throttle-ms: 100
-
- consumer:
- image: juplo/spring-consumer:1.1-json-SNAPSHOT
- environment:
- juplo.bootstrap-server: kafka:9092
- juplo.client-id: consumer
- juplo.consumer.topic: test
-
- peter:
- image: juplo/spring-consumer:1.1-json-SNAPSHOT
- environment:
- juplo.bootstrap-server: kafka:9092
- juplo.client-id: peter
- juplo.consumer.topic: test
-
- ute:
- image: juplo/spring-consumer:1.1-json-SNAPSHOT
- environment:
- juplo.bootstrap-server: kafka:9092
- juplo.client-id: ute
- juplo.consumer.topic: test
-
-volumes:
- zookeeper-data:
- zookeeper-log:
- kafka-1-data:
- kafka-2-data:
- kafka-3-data:
<relativePath/> <!-- lookup parent from repository -->
</parent>
- <groupId>de.juplo.kafka</groupId>
- <artifactId>spring-consumer</artifactId>
- <name>Spring Consumer</name>
- <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-json-SNAPSHOT</version>
+ <groupId>de.juplo.messages</groupId>
+ <artifactId>sumup-messages</artifactId>
+ <name>Messages for the SumUp-Example</name>
+ <description>Some reusable messages for the example SumUp</description>
+ <version>1.0-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
+ <spring-boot.version>3.4.1</spring-boot.version>
</properties>
<dependencies>
<dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <optional>true</optional>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-validation</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
- <scope>compile</scope>
+ <optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
<plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>build-info</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
<plugin>
<groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId>
</plugin>
- <plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>0.45.0</version>
- <configuration>
- <images>
- <image>
- <name>juplo/%a:%v</name>
- </image>
- </images>
- </configuration>
- <executions>
- <execution>
- <id>build</id>
- <phase>package</phase>
- <goals>
- <goal>build</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
-rootProject.name = 'spring-consumer'
+rootProject.name = 'sumup-messages'
+++ /dev/null
-package de.juplo.kafka;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class Application
-{
- public static void main(String[] args)
- {
- SpringApplication.run(Application.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.StickyAssignor;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.Properties;
-
-
-@Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
-public class ApplicationConfiguration
-{
- @Bean
- public ExampleConsumer exampleConsumer(
- Consumer<String, Message> kafkaConsumer,
- ApplicationProperties properties,
- ConfigurableApplicationContext applicationContext)
- {
- return
- new ExampleConsumer(
- properties.getClientId(),
- properties.getConsumerProperties().getTopic(),
- kafkaConsumer,
- () -> applicationContext.close());
- }
-
- @Bean(destroyMethod = "")
- public KafkaConsumer<String, Message> kafkaConsumer(ApplicationProperties properties)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("group.id", properties.getConsumerProperties().getGroupId());
- if (properties.getConsumerProperties().getAutoOffsetReset() != null)
- {
- props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
- }
- if (properties.getConsumerProperties().getAutoCommitInterval() != null)
- {
- props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
- }
- props.put("metadata.maxage.ms", 5000); // 5 Sekunden
- props.put("partition.assignment.strategy", StickyAssignor.class.getName());
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", JsonDeserializer.class.getName());
- props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum");
-
- return new KafkaConsumer<>(props);
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import jakarta.validation.constraints.NotEmpty;
-import jakarta.validation.constraints.NotNull;
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.validation.annotation.Validated;
-
-import java.time.Duration;
-
-
-@ConfigurationProperties(prefix = "juplo")
-@Validated
-@Getter
-@Setter
-public class ApplicationProperties
-{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String clientId;
-
- @NotNull
- private ConsumerProperties consumer;
-
-
- public ConsumerProperties getConsumerProperties()
- {
- return consumer;
- }
-
-
- @Validated
- @Getter
- @Setter
- static class ConsumerProperties
- {
- @NotNull
- @NotEmpty
- private String groupId;
- @NotNull
- @NotEmpty
- private String topic;
- private OffsetReset autoOffsetReset;
- private Duration autoCommitInterval;
-
- enum OffsetReset { latest, earliest, none }
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-
-import java.time.Duration;
-import java.util.Arrays;
-
-
-@Slf4j
-public class ExampleConsumer implements Runnable
-{
- private final String id;
- private final String topic;
- private final Consumer<String, Message> consumer;
- private final Thread workerThread;
- private final Runnable closeCallback;
-
- private long consumed = 0;
-
-
- public ExampleConsumer(
- String clientId,
- String topic,
- Consumer<String, Message> consumer,
- Runnable closeCallback)
- {
- this.id = clientId;
- this.topic = topic;
- this.consumer = consumer;
-
- workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
- workerThread.start();
-
- this.closeCallback = closeCallback;
- }
-
-
- @Override
- public void run()
- {
- try
- {
- log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
-
- while (true)
- {
- ConsumerRecords<String, Message> records = consumer.poll(Duration.ofSeconds(1));
-
- log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, Message> record : records)
- {
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
- }
- }
- }
- catch(WakeupException e)
- {
- log.info("{} - Consumer was signaled to finish its work", id);
- }
- catch(Exception e)
- {
- log.error("{} - Unexpected error, unsubscribing!", id, e);
- consumer.unsubscribe();
- log.info("{} - Triggering exit of application!", id);
- new Thread(closeCallback).start();
- }
- finally
- {
- log.info("{} - Closing the KafkaConsumer", id);
- consumer.close();
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
- }
- }
-
- private void handleRecord(
- String topic,
- Integer partition,
- Long offset,
- String key,
- Message value)
- {
- consumed++;
- log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
- switch (value.getType())
- {
- case ADD -> addNumber((MessageAddNumber)value);
- case CALC -> calcSum((MessageCalculateSum)value);
- default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType());
- }
- }
-
- private void addNumber(MessageAddNumber addNumber)
- {
- log.info("{} - Adding number {}", id, addNumber.getNext());
- }
-
- private void calcSum(MessageCalculateSum calculateSum)
- {
- log.info("{} - Calculating sum", id);
- }
-
- public void shutdown() throws InterruptedException
- {
- log.info("{} - Waking up the consumer", id);
- consumer.wakeup();
- log.info("{} - Joining the worker thread", id);
- workerThread.join();
- }
-}
+++ /dev/null
-juplo:
- bootstrap-server: :9092
- client-id: DEV
- consumer:
- group-id: my-group
- topic: test
- auto-offset-reset: earliest
- auto-commit-interval: 5s
-management:
- endpoint:
- shutdown:
- enabled: true
- endpoints:
- web:
- exposure:
- include: "*"
- info:
- env:
- enabled: true
- java:
- enabled: true
-info:
- kafka:
- bootstrap-server: ${juplo.bootstrap-server}
- client-id: ${juplo.client-id}
- consumer:
- group-id: ${juplo.consumer.group-id}
- topic: ${juplo.consumer.topic}
- auto-offset-reset: ${juplo.consumer.auto-offset-reset}
- auto-commit-interval: ${juplo.consumer.auto-commit-interval}
-logging:
- level:
- root: INFO
- de.juplo: DEBUG
- org.springframework.kafka: INFO
-server:
- port: 8881
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
- <Pattern>%d{HH:mm:ss.SSS} | %highlight(%-5level) %msg%n</Pattern>
- </encoder>
- </appender>
-
- <root level="INFO">
- <appender-ref ref="STDOUT" />
- </root>
-
-</configuration>
+++ /dev/null
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.web.servlet.MockMvc;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
-import static org.awaitility.Awaitility.await;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
-
-@SpringBootTest(
- properties = {
- "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.consumer.topic=" + TOPIC })
-@AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
-public class ApplicationTests
-{
- static final String TOPIC = "FOO";
- static final int PARTITIONS = 10;
-
- @Autowired
- MockMvc mockMvc;
-
-
-
- @Test
- public void testApplicationStartup()
- {
- await("Application is healthy")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> mockMvc
- .perform(get("/actuator/health"))
- .andExpect(status().isOk())
- .andExpect(jsonPath("status").value("UP")));
- }
-}