Merge der Upgrades für Confluent/Spring-Boot (Branch 'customized')
authorKai Moritz <kai@juplo.de>
Fri, 22 Jul 2022 19:53:48 +0000 (21:53 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Jul 2022 19:53:48 +0000 (21:53 +0200)
1  2 
docker-compose.yml
src/main/java/de/juplo/kafka/RestProducer.java

diff --combined docker-compose.yml
@@@ -1,14 -1,14 +1,14 @@@
  version: '3.2'
  services:
    zookeeper:
-     image: confluentinc/cp-zookeeper:7.0.2
+     image: confluentinc/cp-zookeeper:7.1.3
      environment:
        ZOOKEEPER_CLIENT_PORT: 2181
      ports:
        - 2181:2181
  
    kafka:
-     image: confluentinc/cp-kafka:7.0.2
+     image: confluentinc/cp-kafka:7.1.3
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      depends_on:
        - zookeeper
  
 -  kafka-ui:
 -    image: provectuslabs/kafka-ui:0.3.3
 -    ports:
 -      - 8080:8080
 -    environment:
 -      KAFKA_CLUSTERS_0_NAME: local
 -      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
 +  setup:
 +    image: juplo/toolbox
 +    command: >
 +      bash -c "
 +        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
 +        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
 +      "
  
    cli:
      image: juplo/toolbox
      command: sleep infinity
  
 -  producer-0:
 +  producer:
      image: juplo/rest-producer:1.0-SNAPSHOT
      ports:
 -      - 8000:8080
 -    environment:
 -      producer.bootstrap-server: kafka:9092
 -      producer.client-id: producer
 -      producer.topic: test
 -      producer.partition: 0
 -
 -  producer-1:
 -    image: juplo/rest-producer:1.0-SNAPSHOT
 -    ports:
 -      - 8001:8080
 +      - 8080:8080
      environment:
        producer.bootstrap-server: kafka:9092
        producer.client-id: producer
        producer.topic: test
 -      producer.partition: 1
 -
 -  peter:
 -    image: juplo/rest-client:1.0-SNAPSHOT
 -    environment:
 -      rest-client.baseUrl: http://producer-1:8080
 -      rest-client.username: peter
 -      rest-client.throttle-ms: 1000
 -
 -  klaus:
 -    image: juplo/rest-client:1.0-SNAPSHOT
 -    environment:
 -      rest-client.baseUrl: http://producer-1:8080
 -      rest-client.username: klaus
 -      rest-client.throttle-ms: 1100
 -
 -  beate:
 -    image: juplo/rest-client:1.0-SNAPSHOT
 -    environment:
 -      rest-client.baseUrl: http://producer-0:8080
 -      rest-client.username: beate
 -      rest-client.throttle-ms: 900
 -
 -  franz:
 -    image: juplo/rest-client:1.0-SNAPSHOT
 -    environment:
 -      rest-client.baseUrl: http://producer-1:8080
 -      rest-client.username: franz
 -      rest-client.throttle-ms: 800
 -
 -  uschi:
 -    image: juplo/rest-client:1.0-SNAPSHOT
 -    environment:
 -      rest-client.baseUrl: http://producer-0:8080
 -      rest-client.username: uschi
 -      rest-client.throttle-ms: 1200
 -
 -  consumer:
 -    image: juplo/endless-consumer:1.0-SNAPSHOT
 -    ports:
 -      - 8081:8081
 -    environment:
 -      consumer.bootstrap-server: kafka:9092
 -      consumer.client-id: my-group
 -      consumer.client-id: consumer
 -      consumer.topic: test
@@@ -4,11 -4,11 +4,12 @@@ import lombok.extern.slf4j.Slf4j
  import org.apache.kafka.clients.producer.KafkaProducer;
  import org.apache.kafka.clients.producer.ProducerRecord;
  import org.apache.kafka.common.serialization.StringSerializer;
+ import org.springframework.http.HttpStatus;
  import org.springframework.web.bind.annotation.*;
  import org.springframework.web.context.request.async.DeferredResult;
  
  import javax.annotation.PreDestroy;
 +import java.math.BigInteger;
  import java.util.Properties;
  import java.util.concurrent.ExecutionException;
  
@@@ -48,7 -48,6 +49,7 @@@ public class RestProduce
    @PostMapping(path = "{key}")
    public DeferredResult<ProduceResult> send(
        @PathVariable String key,
 +      @RequestHeader(name = "X-id", required = false) Long correlationId,
        @RequestBody String value)
    {
      DeferredResult<ProduceResult> result = new DeferredResult<>();
          value   // Value
      );
  
 +    record.headers().add("source", id.getBytes());
 +    if (correlationId != null)
 +    {
 +      record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
 +    }
 +
      producer.send(record, (metadata, e) ->
      {
        long now = System.currentTimeMillis();
      return result;
    }
  
+   @ExceptionHandler
+   @ResponseStatus(HttpStatus.BAD_REQUEST)
+   public ErrorResponse illegalStateException(IllegalStateException e)
+   {
+     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+   }
    @PreDestroy
    public void destroy() throws ExecutionException, InterruptedException
    {