From: Kai Moritz Date: Mon, 14 Nov 2022 19:01:09 +0000 (+0100) Subject: Alles bis auf das Setup und README.sh für die Vorführung gelöscht X-Git-Tag: errorhandling/spring-consumer--json--poison-pill---2023-06-signal X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=bf00f16dbade97666c30d0ee71fa20c7a2602420;p=demos%2Fkafka%2Ftraining Alles bis auf das Setup und README.sh für die Vorführung gelöscht --- diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/.editorconfig b/.editorconfig deleted file mode 100644 index 633c98a..0000000 --- a/.editorconfig +++ /dev/null @@ -1,13 +0,0 @@ -root = true - -[*] -indent_style = space -indent_size = tab -tab_width = 2 -charset = utf-8 -end_of_line = lf -trim_trailing_whitespace = true -insert_final_newline = false - -[*.properties] -charset = latin1 \ No newline at end of file diff --git a/.maven-dockerexclude b/.maven-dockerexclude deleted file mode 100644 index 72e8ffc..0000000 --- a/.maven-dockerexclude +++ /dev/null @@ -1 +0,0 @@ -* diff --git a/.maven-dockerinclude b/.maven-dockerinclude deleted file mode 100644 index fd6cecd..0000000 --- a/.maven-dockerinclude +++ /dev/null @@ -1 +0,0 @@ -target/*.jar diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 16ee25e..0000000 --- a/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM openjdk:11-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [] diff --git a/README.sh b/README.sh index a5a4774..c2454c6 100755 --- a/README.sh +++ b/README.sh @@ -1,27 +1,12 @@ #!/bin/bash -IMAGE=juplo/spring-consumer--json:1.0-SNAPSHOT - if [ "$1" = "cleanup" ] then - docker-compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean + docker-compose down -v exit fi docker-compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker-compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2 - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose -f docker/docker-compose.yml run --rm cli cub kafka-ready -b kafka:9092 3 60 > /dev/null 2>&1 || exit 1 @@ -34,19 +19,13 @@ while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Wait while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done -# tag::nachrichten[] echo 6 | http -v :8080/peter echo 77 | http -v :8080/klaus -# end::nachrichten[] echo "Writing poison pill..." -# tag::poisonpill[] echo 'BOOM!' | kafkacat -P -b :9092 -t test -# end::poisonpill[] docker-compose -f docker/docker-compose.yml logs -f consumer-1 consumer-2 echo "Restarting consumer-1..." -# tag::restart[] docker-compose -f docker/docker-compose.yml up consumer-1 -# end::restart[] diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 6c06376..0000000 --- a/pom.xml +++ /dev/null @@ -1,102 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 2.7.2 - - - - de.juplo.kafka - spring-consumer--json - Spring Consumer - Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.0-SNAPSHOT - - - 11 - - - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-validation - - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-configuration-processor - true - - - org.springframework.kafka - spring-kafka - - - org.projectlombok - lombok - - - org.springframework.boot - spring-boot-starter-test - test - - - org.springframework.kafka - spring-kafka-test - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - build-info - - - - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - - - juplo/%a:%v - - - - - - build - package - - build - - - - - - maven-failsafe-plugin - - - - - diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index 04dc343..0000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,66 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.util.concurrent.ListenableFuture; - -import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutionException; - - -@SpringBootApplication -@Slf4j -public class Application implements ApplicationRunner -{ - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - Consumer kafkaConsumer; - @Autowired - SimpleConsumer simpleConsumer; - @Autowired - ConfigurableApplicationContext context; - - ListenableFuture consumerJob; - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting SimpleConsumer"); - consumerJob = taskExecutor.submitListenable(simpleConsumer); - consumerJob.addCallback( - exitStatus -> - { - log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); - SpringApplication.exit(context, () -> exitStatus); - }, - t -> - { - log.error("SimpleConsumer exited abnormally!", t); - SpringApplication.exit(context, () -> 2); - }); - } - - @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException - { - log.info("Signaling SimpleConsumer to quit its work"); - kafkaConsumer.wakeup(); - log.info("Waiting for SimpleConsumer to finish its work"); - consumerJob.get(); - log.info("SimpleConsumer finished its work"); - } - - - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index a8b3e1d..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,34 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import org.springframework.kafka.core.ConsumerFactory; - - -@Configuration -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) -public class ApplicationConfiguration -{ - @Bean - public SimpleConsumer simpleConsumer( - Consumer kafkaConsumer, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return - new SimpleConsumer( - kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer); - } - - @Bean - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index a4cc8b8..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; - - -@ConfigurationProperties(prefix = "simple.consumer") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String topic; -} diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java deleted file mode 100644 index e4999b7..0000000 --- a/src/main/java/de/juplo/kafka/Message.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka; - - -public abstract class Message -{ - public enum Type {ADD, CALC} - - public abstract Type getType(); -} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java deleted file mode 100644 index c024b65..0000000 --- a/src/main/java/de/juplo/kafka/MessageAddNumber.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class MessageAddNumber extends Message -{ - private Integer next; - - - @Override - public Type getType() - { - return Type.ADD; - } -} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java deleted file mode 100644 index afc5a39..0000000 --- a/src/main/java/de/juplo/kafka/MessageCalculateSum.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class MessageCalculateSum extends Message -{ - @Override - public Type getType() - { - return Type.CALC; - } -} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java deleted file mode 100644 index 45f9b94..0000000 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ /dev/null @@ -1,80 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.Callable; - - -@Slf4j -@RequiredArgsConstructor -public class SimpleConsumer implements Callable -{ - private final String id; - private final String topic; - private final Consumer consumer; - - private long consumed = 0; - - - @Override - public Integer call() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - return 0; - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); - return 1; - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - Message value) - { - consumed++; - log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml deleted file mode 100644 index 07d0625..0000000 --- a/src/main/resources/application.yml +++ /dev/null @@ -1,45 +0,0 @@ -simple: - consumer: - topic: test -management: - endpoint: - shutdown: - enabled: true - endpoints: - web: - exposure: - include: "*" - info: - env: - enabled: true - java: - enabled: true -info: - kafka: - bootstrap-server: ${spring.kafka.bootstrap-servers} - client-id: ${spring.kafka.client-id} - group-id: ${spring.kafka.consumer.group-id} - topic: ${simple.consumer.topic} - auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} -spring: - kafka: - bootstrap-servers: :9092 - client-id: DEV - consumer: - group-id: my-group - auto-offset-reset: earliest - auto-commit-interval: 5s - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer - properties: - partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor - metadata.max.age.ms: 1000 - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum -logging: - level: - root: INFO - de.juplo: DEBUG -server: - port: 8881 diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index b8e6780..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - %highlight(%-5level) %m%n - - - - - - - - - - - diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java deleted file mode 100644 index 1baca99..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ /dev/null @@ -1,40 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.boot.test.web.server.LocalServerPort; -import org.springframework.kafka.test.context.EmbeddedKafka; - -import static de.juplo.kafka.ApplicationIT.TOPIC; - - -@SpringBootTest( - webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, - properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "simple.consumer.topic=" + TOPIC }) -@EmbeddedKafka(topics = TOPIC) -public class ApplicationIT -{ - public static final String TOPIC = "FOO"; - - @LocalServerPort - private int port; - - @Autowired - private TestRestTemplate restTemplate; - - - - @Test - public void testApplicationStartup() - { - restTemplate.getForObject( - "http://localhost:" + port + "/actuator/health", - String.class - ) - .contains("UP"); - } -} diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java deleted file mode 100644 index 82116f4..0000000 --- a/src/test/java/de/juplo/kafka/MessageTest.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - - -public class MessageTest -{ - ObjectMapper mapper = new ObjectMapper(); - - @Test - @DisplayName("Deserialize a MessageAddNumber message") - public void testDeserializeMessageAddNumber() - { - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); - } - - @Test - @DisplayName("Deserialize a MessageCalculateSum message") - public void testDeserializeMessageCalculateSum() throws JsonProcessingException - { - Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); - } -}