From a7e838d24e71a23fe2ee6611e6581d4ccce119bd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 29 Jan 2023 17:08:46 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20=C3=9Cbung=20`rest-pr?= =?utf8?q?oducer--partitioning`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .dockerignore | 2 - .editorconfig | 13 --- .gitignore | 3 - .maven-dockerexclude | 1 - .maven-dockerinclude | 1 - Dockerfile | 5 - README.sh | 6 - pom.xml | 109 ------------------ src/main/java/de/juplo/kafka/Application.java | 14 --- .../juplo/kafka/ApplicationConfiguration.java | 46 -------- .../de/juplo/kafka/ApplicationProperties.java | 36 ------ .../java/de/juplo/kafka/ErrorResponse.java | 11 -- .../java/de/juplo/kafka/ProduceFailure.java | 21 ---- .../java/de/juplo/kafka/ProduceResult.java | 11 -- .../java/de/juplo/kafka/ProduceSuccess.java | 12 -- .../java/de/juplo/kafka/RestProducer.java | 101 ---------------- src/main/resources/application.yml | 36 ------ src/main/resources/logback.xml | 17 --- .../java/de/juplo/kafka/ApplicationTests.java | 85 -------------- 19 files changed, 530 deletions(-) delete mode 100644 .dockerignore delete mode 100644 .editorconfig delete mode 100644 .gitignore delete mode 100644 .maven-dockerexclude delete mode 100644 .maven-dockerinclude delete mode 100644 Dockerfile delete mode 100644 pom.xml delete mode 100644 src/main/java/de/juplo/kafka/Application.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java delete mode 100644 src/main/java/de/juplo/kafka/ErrorResponse.java delete mode 100644 src/main/java/de/juplo/kafka/ProduceFailure.java delete mode 100644 src/main/java/de/juplo/kafka/ProduceResult.java delete mode 100644 src/main/java/de/juplo/kafka/ProduceSuccess.java delete mode 100644 src/main/java/de/juplo/kafka/RestProducer.java delete mode 100644 src/main/resources/application.yml delete mode 100644 src/main/resources/logback.xml delete mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/.editorconfig b/.editorconfig deleted file mode 100644 index 633c98a..0000000 --- a/.editorconfig +++ /dev/null @@ -1,13 +0,0 @@ -root = true - -[*] -indent_style = space -indent_size = tab -tab_width = 2 -charset = utf-8 -end_of_line = lf -trim_trailing_whitespace = true -insert_final_newline = false - -[*.properties] -charset = latin1 \ No newline at end of file diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 6240411..0000000 --- a/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.iml -.idea -target diff --git a/.maven-dockerexclude b/.maven-dockerexclude deleted file mode 100644 index 72e8ffc..0000000 --- a/.maven-dockerexclude +++ /dev/null @@ -1 +0,0 @@ -* diff --git a/.maven-dockerinclude b/.maven-dockerinclude deleted file mode 100644 index fd6cecd..0000000 --- a/.maven-dockerinclude +++ /dev/null @@ -1 +0,0 @@ -target/*.jar diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 16ee25e..0000000 --- a/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM openjdk:11-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [] diff --git a/README.sh b/README.sh index 76ed9c3..1b4d217 100755 --- a/README.sh +++ b/README.sh @@ -29,10 +29,8 @@ while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Wait while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done docker-compose up -d consumer -# tag::http[] echo -n Nachricht 1 an Producer 0 | http -v :8000/foo echo -n Nachricht 1 an Producer 1 | http -v :8001/foo -# end::http[] echo -n Nachricht 2 an Producer 0 | http -v :8000/bar echo -n Nachricht 2 an Producer 1 | http -v :8001/bar @@ -41,9 +39,7 @@ docker-compose logs consumer docker-compose exec -T cli bash << 'EOF' echo "Altering number of partitions from 3 to 7..." kafka-topics --bootstrap-server kafka:9092 --describe --topic test -# tag::repartitioning[] kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 -# end::repartitioning[] kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF @@ -58,7 +54,5 @@ echo -n Nachricht 4 an Producer 1 | http -v :8001/bar docker-compose logs consumer -# tag::kafkacat[] kafkacat -b :9092 -t test -o 0 -p0 -f'p=%p|o=%o|k=%k|v=%s\n' -qe kafkacat -b :9092 -t test -o 0 -p1 -f'p=%p|o=%o|k=%k|v=%s\n' -qe -# end::kafkacat[] diff --git a/pom.xml b/pom.xml deleted file mode 100644 index e7ea677..0000000 --- a/pom.xml +++ /dev/null @@ -1,109 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 2.7.2 - - - - de.juplo.kafka - rest-producer - REST Producer - A Simple Producer that takes messages via POST and confirms successs - 1.0-SNAPSHOT - - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-configuration-processor - true - - - org.springframework.boot - spring-boot-starter-validation - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - - - org.springframework.boot - spring-boot-starter-test - test - - - org.springframework.kafka - spring-kafka - test - - - org.springframework.kafka - spring-kafka-test - test - - - org.awaitility - awaitility - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - build-info - - - - - - pl.project13.maven - git-commit-id-plugin - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - - - juplo/%a:%v - - - - - - build - package - - build - - - - - - - - diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index 0069257..0000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class Application -{ - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index 0642aa4..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,46 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.Properties; - - -@Configuration -@EnableConfigurationProperties(ApplicationProperties.class) -public class ApplicationConfiguration -{ - @Bean - public RestProducer restProducer( - ApplicationProperties properties, - KafkaProducer kafkaProducer) - { - return - new RestProducer( - properties.getClientId(), - properties.getTopic(), - properties.getPartition(), - kafkaProducer); - } - - @Bean(destroyMethod = "close") - public KafkaProducer kafkaProducer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getAcks()); - props.put("batch.size", properties.getBatchSize()); - props.put("delivery.timeout.ms", 20000); // 20 Sekunden - props.put("request.timeout.ms", 10000); // 10 Sekunden - props.put("linger.ms", properties.getLingerMs()); - props.put("compression.type", properties.getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index 673613a..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,36 +0,0 @@ -package de.juplo.kafka; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; - - -@ConfigurationProperties(prefix = "producer") -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull - @NotEmpty - private String topic; - private Integer partition; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Integer batchSize; - @NotNull - private Integer lingerMs; - @NotNull - @NotEmpty - private String compressionType; -} diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java deleted file mode 100644 index 5ca206d..0000000 --- a/src/main/java/de/juplo/kafka/ErrorResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import lombok.Value; - - -@Value -public class ErrorResponse -{ - private final String error; - private final Integer status; -} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java deleted file mode 100644 index 873a67b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceFailure implements ProduceResult -{ - private final String error; - private final String exception; - private final Integer status; - - - public ProduceFailure(Exception e) - { - status = 500; - exception = e.getClass().getSimpleName(); - error = e.getMessage(); - } -} diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java deleted file mode 100644 index ceff329..0000000 --- a/src/main/java/de/juplo/kafka/ProduceResult.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonInclude; - -import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; - - -@JsonInclude(NON_NULL) -public interface ProduceResult -{ -} diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java deleted file mode 100644 index 9c79e8b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceSuccess.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceSuccess implements ProduceResult -{ - Integer partition; - Long offset; -} diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java deleted file mode 100644 index debe366..0000000 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ /dev/null @@ -1,101 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; - -import java.math.BigInteger; - - -@Slf4j -@RequestMapping -@ResponseBody -@RequiredArgsConstructor -public class RestProducer -{ - private final String id; - private final String topic; - private final Integer partition; - private final KafkaProducer producer; - - private long produced = 0; - - @PostMapping(path = "{key}") - public DeferredResult send( - @PathVariable String key, - @RequestHeader(name = "X-id", required = false) Long correlationId, - @RequestBody String value) - { - DeferredResult result = new DeferredResult<>(); - - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - partition, // Partition - key, // Key - value // Value - ); - - record.headers().add("source", id.getBytes()); - if (correlationId != null) - { - record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray()); - } - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message with key={} latency={}ms", - id, - record.key(), - now - time - ); - - return result; - } - - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml deleted file mode 100644 index 0d5752c..0000000 --- a/src/main/resources/application.yml +++ /dev/null @@ -1,36 +0,0 @@ -producer: - bootstrap-server: :9092 - client-id: DEV - topic: test - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip -management: - endpoint: - shutdown: - enabled: true - endpoints: - web: - exposure: - include: "*" - info: - env: - enabled: true - java: - enabled: true -info: - kafka: - bootstrap-server: ${producer.bootstrap-server} - client-id: ${producer.client-id} - topic: ${producer.topic} - acks: ${producer.acks} - batch-size: ${producer.batch-size} - linger-ms: ${producer.linger-ms} - compression-type: ${producer.compression-type} -logging: - level: - root: INFO - de.juplo: TRACE -server: - port: 8880 diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index b8e6780..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - %highlight(%-5level) %m%n - - - - - - - - - - - diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index 646a335..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,85 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.web.servlet.MockMvc; - -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; - -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; -import static org.awaitility.Awaitility.*; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.bootstrap-server=${spring.embedded.kafka.brokers}", - "producer.topic=" + TOPIC}) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@Slf4j -public class ApplicationTests -{ - static final String TOPIC = "FOO"; - static final int PARTITIONS = 10; - - @Autowired - MockMvc mockMvc; - @Autowired - Consumer consumer; - - - @BeforeEach - public void clear() - { - consumer.received.clear(); - } - - - @Test - void testSendMessage() throws Exception - { - mockMvc - .perform(post("/peter").content("Hallo Welt!")) - .andExpect(status().isOk()); - await("Message was send") - .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); - } - - - static class Consumer - { - final List> received = new LinkedList<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC) - public void receive(ConsumerRecord record) - { - log.debug("Received message: {}", record); - received.add(record); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - } -} -- 2.20.1