From: Kai Moritz Date: Fri, 22 Jul 2022 18:38:42 +0000 (+0200) Subject: Merge der Upgrades für Confluent/Spring-Boot (Branch 'rest-producer') X-Git-Tag: acks---lvm-2-tage~9^2^2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=8a00eec9b10c293024423516663c755400124dc9;hp=96998bebe231ac0e61ef6521a2c23694e81cfd86;p=demos%2Fkafka%2Ftraining Merge der Upgrades für Confluent/Spring-Boot (Branch 'rest-producer') * Eigentlich lässt sich hier leichter der Branch 'first-contact' mergen * Da es aber Verbesserungen des Setups im Branch 'rest-producer' gab, wurde dieser gemerged. --- diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude deleted file mode 100644 index 72e8ffc..0000000 --- a/.maven-dockerexclude +++ /dev/null @@ -1 +0,0 @@ -* diff --git a/.maven-dockerinclude b/.maven-dockerinclude deleted file mode 100644 index fd6cecd..0000000 --- a/.maven-dockerinclude +++ /dev/null @@ -1 +0,0 @@ -target/*.jar diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 16ee25e..0000000 --- a/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM openjdk:11-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [] diff --git a/README.sh b/README.sh index 698d6dd..f651155 100755 --- a/README.sh +++ b/README.sh @@ -29,9 +29,5 @@ docker-compose up -d sleep 15 -echo foo | http -v :8080/bar -dd if=/dev/zero bs=1024 count=1024 | http -v :8080/fehler -http -v :8081/seen docker-compose stop producer consumer -docker-compose logs producer diff --git a/pom.xml b/pom.xml deleted file mode 100644 index e4d24bb..0000000 --- a/pom.xml +++ /dev/null @@ -1,105 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 2.7.2 - - - - de.juplo.kafka - rest-producer - REST Producer - A Simple Producer that takes messages via POST and confirms successs - 1.0-SNAPSHOT - - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-configuration-processor - true - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - - - org.springframework.boot - spring-boot-starter-test - test - - - org.springframework.kafka - spring-kafka - test - - - org.springframework.kafka - spring-kafka-test - test - - - org.awaitility - awaitility - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - build-info - - - - - - pl.project13.maven - git-commit-id-plugin - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - - - juplo/%a:%v - - - - - - build - package - - build - - - - - - - - diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index 9f3e3ed..0000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.concurrent.Executors; - - -@SpringBootApplication -@EnableConfigurationProperties(ApplicationProperties.class) -public class Application -{ - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index 1f30262..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - -@ConfigurationProperties(prefix = "producer") -@Getter -@Setter -public class ApplicationProperties -{ - private String bootstrapServer; - private String clientId; - private String topic; - private String acks; - private Integer batchSize; - private Integer lingerMs; - private String compressionType; -} diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java deleted file mode 100644 index 5ca206d..0000000 --- a/src/main/java/de/juplo/kafka/ErrorResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import lombok.Value; - - -@Value -public class ErrorResponse -{ - private final String error; - private final Integer status; -} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java deleted file mode 100644 index 873a67b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceFailure implements ProduceResult -{ - private final String error; - private final String exception; - private final Integer status; - - - public ProduceFailure(Exception e) - { - status = 500; - exception = e.getClass().getSimpleName(); - error = e.getMessage(); - } -} diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java deleted file mode 100644 index ceff329..0000000 --- a/src/main/java/de/juplo/kafka/ProduceResult.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonInclude; - -import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; - - -@JsonInclude(NON_NULL) -public interface ProduceResult -{ -} diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java deleted file mode 100644 index 9c79e8b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceSuccess.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceSuccess implements ProduceResult -{ - Integer partition; - Long offset; -} diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java deleted file mode 100644 index 7d9bf12..0000000 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ /dev/null @@ -1,124 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; - -import javax.annotation.PreDestroy; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; - - -@Slf4j -@RestController -public class RestProducer -{ - private final String id; - private final String topic; - private final KafkaProducer producer; - - private long produced = 0; - - public RestProducer(ApplicationProperties properties) - { - this.id = properties.getClientId(); - this.topic = properties.getTopic(); - - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getAcks()); - props.put("batch.size", properties.getBatchSize()); - props.put("delivery.timeout.ms", 20000); // 20 Sekunden - props.put("request.timeout.ms", 10000); // 10 Sekunden - props.put("linger.ms", properties.getLingerMs()); - props.put("compression.type", properties.getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); - } - - @PostMapping(path = "{key}") - public DeferredResult send( - @PathVariable String key, - @RequestBody String value) - { - DeferredResult result = new DeferredResult<>(); - - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued #{} key={} latency={}ms", - id, - value, - record.key(), - now - time - ); - - return result; - } - - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - log.info("{} - Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml deleted file mode 100644 index 726204e..0000000 --- a/src/main/resources/application.yml +++ /dev/null @@ -1,36 +0,0 @@ -producer: - bootstrap-server: :9092 - client-id: DEV - topic: test - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip -management: - endpoint: - shutdown: - enabled: true - endpoints: - web: - exposure: - include: "*" - info: - env: - enabled: true - java: - enabled: true -info: - kafka: - bootstrap-server: ${producer.bootstrap-server} - client-id: ${producer.client-id} - topic: ${producer.topic} - acks: ${producer.acks} - batch-size: ${producer.batch-size} - linger-ms: ${producer.linger-ms} - compression-type: ${producer.compression-type} -logging: - level: - root: INFO - de.juplo: DEBUG -server: - port: 8880 diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index b8e6780..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - %highlight(%-5level) %m%n - - - - - - - - - - - diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index cf70c81..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,86 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.web.servlet.MockMvc; - -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; - -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; -import static org.awaitility.Awaitility.*; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.bootstrap-server=${spring.embedded.kafka.brokers}", - "producer.topic=" + TOPIC}) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@Slf4j -public class ApplicationTests -{ - static final String TOPIC = "FOO"; - static final int PARTITIONS = 10; - - @Autowired - MockMvc mockMvc; - @Autowired - Consumer consumer; - - - @BeforeEach - public void clear() - { - consumer.received.clear(); - } - - - @Test - void testSendMessage() throws Exception - { - mockMvc - .perform(post("/peter").content("Hallo Welt!")) - .andExpect(status().isOk()); - await("Message was send") - .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); - } - - - static class Consumer - { - final List> received = new LinkedList<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC) - public void receive(ConsumerRecord record) - { - log.debug("Received message: {}", record); - received.add(record); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - } -}