From: Kai Moritz Date: Sun, 3 Apr 2022 11:45:31 +0000 (+0200) Subject: Basis-Setup für Übungen zur Producer-Konfiguration X-Git-Tag: acks-KAPUTT-DEPRECATED~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=03ea4a9c21a1188c8a546a1fb9c15d7fe8b98267;p=demos%2Fkafka%2Ftraining Basis-Setup für Übungen zur Producer-Konfiguration --- diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude deleted file mode 100644 index 72e8ffc..0000000 --- a/.maven-dockerexclude +++ /dev/null @@ -1 +0,0 @@ -* diff --git a/.maven-dockerinclude b/.maven-dockerinclude deleted file mode 100644 index fd6cecd..0000000 --- a/.maven-dockerinclude +++ /dev/null @@ -1 +0,0 @@ -target/*.jar diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 16ee25e..0000000 --- a/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM openjdk:11-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [] diff --git a/README.sh b/README.sh index 9bca150..f651155 100755 --- a/README.sh +++ b/README.sh @@ -29,9 +29,5 @@ docker-compose up -d sleep 15 -echo foo | http :8080/bar -dd if=/dev/zero bs=1024 count=1024 | http :8080/fehler -http :8081/seen docker-compose stop producer consumer -docker-compose logs producer diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 129ea94..0000000 --- a/pom.xml +++ /dev/null @@ -1,78 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 2.6.5 - - - - de.juplo.kafka - rest-producer - REST Producer: a Simple Producer that takes messages via POST and confirms successs - 1.0-SNAPSHOT - - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-configuration-processor - true - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - - - org.springframework.boot - spring-boot-starter-test - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - - - juplo/%a:%v - - - - - - build - package - - build - - - - - - - - diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index 9f3e3ed..0000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.concurrent.Executors; - - -@SpringBootApplication -@EnableConfigurationProperties(ApplicationProperties.class) -public class Application -{ - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index 1f30262..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - -@ConfigurationProperties(prefix = "producer") -@Getter -@Setter -public class ApplicationProperties -{ - private String bootstrapServer; - private String clientId; - private String topic; - private String acks; - private Integer batchSize; - private Integer lingerMs; - private String compressionType; -} diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java deleted file mode 100644 index 5ca206d..0000000 --- a/src/main/java/de/juplo/kafka/ErrorResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import lombok.Value; - - -@Value -public class ErrorResponse -{ - private final String error; - private final Integer status; -} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java deleted file mode 100644 index 873a67b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceFailure implements ProduceResult -{ - private final String error; - private final String exception; - private final Integer status; - - - public ProduceFailure(Exception e) - { - status = 500; - exception = e.getClass().getSimpleName(); - error = e.getMessage(); - } -} diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java deleted file mode 100644 index ceff329..0000000 --- a/src/main/java/de/juplo/kafka/ProduceResult.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonInclude; - -import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; - - -@JsonInclude(NON_NULL) -public interface ProduceResult -{ -} diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java deleted file mode 100644 index 9c79e8b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceSuccess.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceSuccess implements ProduceResult -{ - Integer partition; - Long offset; -} diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java deleted file mode 100644 index dea49f0..0000000 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ /dev/null @@ -1,116 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; - -import javax.annotation.PreDestroy; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; - - -@Slf4j -@RestController -public class RestProducer -{ - private final String id; - private final String topic; - private final KafkaProducer producer; - - private long produced = 0; - - public RestProducer(ApplicationProperties properties) - { - this.id = properties.getClientId(); - this.topic = properties.getTopic(); - - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getAcks()); - props.put("batch.size", properties.getBatchSize()); - props.put("delivery.timeout.ms", 20000); // 20 Sekunden - props.put("request.timeout.ms", 10000); // 10 Sekunden - props.put("linger.ms", properties.getLingerMs()); - props.put("compression.type", properties.getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); - } - - @PostMapping(path = "{key}") - public DeferredResult send( - @PathVariable String key, - @RequestBody String value) - { - DeferredResult result = new DeferredResult<>(); - - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued #{} key={} latency={}ms", - id, - value, - record.key(), - now - time - ); - - return result; - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - log.info("{} - Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml deleted file mode 100644 index fcc0f3c..0000000 --- a/src/main/resources/application.yml +++ /dev/null @@ -1,17 +0,0 @@ -producer: - bootstrap-server: :9092 - client-id: peter - topic: test - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip -management: - endpoints: - web: - exposure: - include: "*" -logging: - level: - root: INFO - de.juplo: DEBUG diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index b8e6780..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - %highlight(%-5level) %m%n - - - - - - - - - - -