From: Kai Moritz Date: Tue, 18 May 2021 22:22:42 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fseek;a=commitdiff_plain;h=7fa06f9b4d341fb34fec502ec4f18048231a43a7 WIP --- 7fa06f9b4d341fb34fec502ec4f18048231a43a7 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..612c5bc --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +target +.idea +*.iml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..16ee25e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/README.sh b/README.sh new file mode 100755 index 0000000..e01b9c5 --- /dev/null +++ b/README.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +if [ "$1" = "cleanup" ] +then + docker-compose down -v + exit +fi + +docker-compose up -d zookeeper kafka + +while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; +do + echo "Waiting for kafka..."; + sleep 1; +done + +docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --topic foo + +docker-compose up -d producer consumer + +sleep 3 +docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest +sleep 3 +docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest +sleep 3 +docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group bar --reset-offsets --to-earliest + +docker-compose stop producer consumer +docker-compose logs consumer diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c6562e1 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,113 @@ +version: "3" + +services: + + zookeeper: + image: confluentinc/cp-zookeeper:6.0.1 + ports: + - 2181:2181 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:6.0.1 + ports: + - 9092:9092 + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zookeeper + + producer: + image: confluentinc/cp-kafkacat:6.0.1 + command: + bash -c ' + export A=0; + while true; + do + export A=$$(($$A + 1)); + echo -n $$A; + echo $$A | kafkacat -b kafka:9093 -t foo -k $$A%7; + done' + tty: true + + peter: + image: juplo/seek:1.0-SNAPSHOT + ports: + - 8001:8001 + environment: + server.port: 8001 + seek.bootstrap-server: kafka:9093 + seek.group-id: seek + seek.client-id: peter + seek.topic: test + + franz: + image: juplo/seek:1.0-SNAPSHOT + ports: + - 8002:8002 + environment: + server.port: 8002 + seek.bootstrap-server: kafka:9093 + seek.group-id: seek + seek.client-id: franz + seek.topic: test + + beate: + image: juplo/seek:1.0-SNAPSHOT + ports: + - 8003:8003 + environment: + server.port: 8003 + seek.bootstrap-server: kafka:9093 + seek.group-id: seek + seek.client-id: beate + seek.topic: test + + ute: + image: juplo/seek:1.0-SNAPSHOT + ports: + - 8004:8004 + environment: + server.port: 8004 + seek.bootstrap-server: kafka:9093 + seek.group-id: seek + seek.client-id: ute + seek.topic: test + + klaus: + image: juplo/seek:1.0-SNAPSHOT + ports: + - 8005:8005 + environment: + server.port: 8005 + seek.bootstrap-server: kafka:9093 + seek.group-id: seek + seek.client-id: klaus + seek.topic: test + + paul: + image: juplo/seek:1.0-SNAPSHOT + ports: + - 8006:8006 + environment: + server.port: 8006 + seek.bootstrap-server: kafka:9093 + seek.group-id: seek + seek.client-id: paul + seek.topic: test + + siggi: + image: juplo/seek:1.0-SNAPSHOT + ports: + - 8007:8007 + environment: + server.port: 8007 + seek.bootstrap-server: kafka:9093 + seek.group-id: seek + seek.client-id: siggi + seek.topic: test diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..922d525 --- /dev/null +++ b/pom.xml @@ -0,0 +1,87 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.1.5.RELEASE + + + + de.juplo.kafka + seek + 1.0-SNAPSHOT + Seek Example + Can I Seek All Partitions Of A Running Consumer-Group From A Single Instance + + + 11 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.apache.kafka + kafka-clients + + + org.projectlombok + lombok + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + + + + build + package + + build + + + + + + + + diff --git a/src/main/java/de/juplo/kafka/seek/Application.java b/src/main/java/de/juplo/kafka/seek/Application.java new file mode 100644 index 0000000..cddf354 --- /dev/null +++ b/src/main/java/de/juplo/kafka/seek/Application.java @@ -0,0 +1,43 @@ +package de.juplo.kafka.seek; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.util.concurrent.Executors; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + + @Autowired + ApplicationProperties properties; + + + @Bean + public Consumer consumer() + { + Assert.hasText(properties.getBootstrapServer(), "seek.bootstrap-server must be set"); + Assert.hasText(properties.getGroupId(), "seek.group-id must be set"); + Assert.hasText(properties.getClientId(), "seek.client-id must be set"); + Assert.hasText(properties.getTopic(), "seek.topic must be set"); + + return + new Consumer( + Executors.newFixedThreadPool(1), + properties.getBootstrapServer(), + properties.getGroupId(), + properties.getClientId(), + properties.getTopic()); + } + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/seek/ApplicationProperties.java b/src/main/java/de/juplo/kafka/seek/ApplicationProperties.java new file mode 100644 index 0000000..d098a03 --- /dev/null +++ b/src/main/java/de/juplo/kafka/seek/ApplicationProperties.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.seek; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "seek") +@Getter +@Setter +public class ApplicationProperties +{ + private String bootstrapServer; + private String groupId; + private String clientId; + private String topic; +} diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java new file mode 100644 index 0000000..0c232d6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/seek/Consumer.java @@ -0,0 +1,119 @@ +package de.juplo.kafka.seek; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + + +@Slf4j +public class Consumer implements Runnable +{ + private final ExecutorService executor; + private final String id; + private final String topic; + private final KafkaConsumer consumer; + + private boolean running = false; + Future future = null; + + + public Consumer( + ExecutorService executor, + String bootstrapServer, + String groupId, + String clientId, + String topic) + { + this.executor = executor; + + this.id = clientId; + this.topic = topic; + + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServer); + props.put("group.id", groupId); + props.put("client.id", clientId); + props.put("key.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + consumer = new KafkaConsumer<>(props); + } + + + @Override + public void run() + { + log.info("{} - Subscribing to topic test", id); + consumer.subscribe(Arrays.asList(topic)); + + try + { + + running = true; + + while (running) + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) + log.info( + "{} - {}: {}/{} - {}", + id, + record.offset(), + record.topic(), + record.partition(), + record.value() + ); + } + } + catch(WakeupException e) + { + log.info("{} - RIIING!", id); + } + finally + { + log.info("{} - Unsubscribing...", id); + consumer.unsubscribe(); + running = false; + } + } + + public synchronized void start() + { + if (running) + throw new RuntimeException("Consumier instance " + id + " is already running!"); + + log.info("Running {}", id); + future = executor.submit(this); + } + + public synchronized void stop() throws ExecutionException, InterruptedException + { + if (!running) + throw new RuntimeException("Consumier instance " + id + " is not running!"); + + log.info("Stopping {}", id); + running = false; + consumer.wakeup(); + future.get(); + } + + @PreDestroy + public void destroy() throws ExecutionException, InterruptedException + { + stop(); + log.info("{} - Closing the KafkaConsumer", id); + consumer.close(Duration.ofSeconds(3)); + } +} diff --git a/src/main/java/de/juplo/kafka/seek/SeekController.java b/src/main/java/de/juplo/kafka/seek/SeekController.java new file mode 100644 index 0000000..9a96c4a --- /dev/null +++ b/src/main/java/de/juplo/kafka/seek/SeekController.java @@ -0,0 +1,28 @@ +package de.juplo.kafka.seek; + +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.ExecutionException; + + +@RestController +@RequiredArgsConstructor +public class SeekController +{ + private final Consumer consumer; + + + @PostMapping("start") + public void start() + { + consumer.start(); + } + + @PostMapping("stop") + public void stop() throws ExecutionException, InterruptedException + { + consumer.stop(); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..7ea719a --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,5 @@ +management: + endpoints: + web: + exposure: + include: "*" diff --git a/src/test/java/de/juplo/kafka/seek/ApplicationTests.java b/src/test/java/de/juplo/kafka/seek/ApplicationTests.java new file mode 100644 index 0000000..c466977 --- /dev/null +++ b/src/test/java/de/juplo/kafka/seek/ApplicationTests.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.seek; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest(properties = { + "seek.bootstrap-server=:9092", + "seek.topic=test", + "seek.id=peter" }) +public class ApplicationTests +{ + @Test + public void contextLoads() {} +}