Merge branch 'endless-stream-producer' into rest-producer
authorKai Moritz <kai@juplo.de>
Tue, 31 May 2022 03:39:30 +0000 (05:39 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 31 May 2022 03:39:30 +0000 (05:39 +0200)
1  2 
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml

@@@ -37,9 -37,9 +37,9 @@@ services
      command: sleep infinity
  
    producer:
 -    image: juplo/endless-producer:1.0-SNAPSHOT
 +    image: juplo/rest-producer:1.0-SNAPSHOT
      ports:
-       - 8080:8080
+       - 8080:8880
      environment:
        producer.bootstrap-server: kafka:9092
        producer.client-id: producer
diff --cc pom.xml
Simple merge
index dea49f0,0000000..7d9bf12
mode 100644,000000..100644
--- /dev/null
@@@ -1,116 -1,0 +1,124 @@@
 +package de.juplo.kafka;
 +
 +import lombok.extern.slf4j.Slf4j;
 +import org.apache.kafka.clients.producer.KafkaProducer;
 +import org.apache.kafka.clients.producer.ProducerRecord;
 +import org.apache.kafka.common.serialization.StringSerializer;
++import org.springframework.http.HttpStatus;
 +import org.springframework.http.MediaType;
 +import org.springframework.web.bind.annotation.*;
 +import org.springframework.web.context.request.async.DeferredResult;
 +
 +import javax.annotation.PreDestroy;
 +import java.util.Properties;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +
 +
 +@Slf4j
 +@RestController
 +public class RestProducer
 +{
 +  private final String id;
 +  private final String topic;
 +  private final KafkaProducer<String, String> producer;
 +
 +  private long produced = 0;
 +
 +  public RestProducer(ApplicationProperties properties)
 +  {
 +    this.id = properties.getClientId();
 +    this.topic = properties.getTopic();
 +
 +    Properties props = new Properties();
 +    props.put("bootstrap.servers", properties.getBootstrapServer());
 +    props.put("client.id", properties.getClientId());
 +    props.put("acks", properties.getAcks());
 +    props.put("batch.size", properties.getBatchSize());
 +    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
 +    props.put("request.timeout.ms",  10000); // 10 Sekunden
 +    props.put("linger.ms", properties.getLingerMs());
 +    props.put("compression.type", properties.getCompressionType());
 +    props.put("key.serializer", StringSerializer.class.getName());
 +    props.put("value.serializer", StringSerializer.class.getName());
 +
 +    this.producer = new KafkaProducer<>(props);
 +  }
 +
 +  @PostMapping(path = "{key}")
 +  public DeferredResult<ProduceResult> send(
 +      @PathVariable String key,
 +      @RequestBody String value)
 +  {
 +    DeferredResult<ProduceResult> result = new DeferredResult<>();
 +
 +    final long time = System.currentTimeMillis();
 +
 +    final ProducerRecord<String, String> record = new ProducerRecord<>(
 +        topic,  // Topic
 +        key,    // Key
 +        value   // Value
 +    );
 +
 +    producer.send(record, (metadata, e) ->
 +    {
 +      long now = System.currentTimeMillis();
 +      if (e == null)
 +      {
 +        // HANDLE SUCCESS
 +        produced++;
 +        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
 +        log.debug(
 +            "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
 +            id,
 +            record.key(),
 +            record.value(),
 +            metadata.partition(),
 +            metadata.offset(),
 +            metadata.timestamp(),
 +            now - time
 +        );
 +      }
 +      else
 +      {
 +        // HANDLE ERROR
 +        result.setErrorResult(new ProduceFailure(e));
 +        log.error(
 +            "{} - ERROR key={} timestamp={} latency={}ms: {}",
 +            id,
 +            record.key(),
 +            metadata == null ? -1 : metadata.timestamp(),
 +            now - time,
 +            e.toString()
 +        );
 +      }
 +    });
 +
 +    long now = System.currentTimeMillis();
 +    log.trace(
 +        "{} - Queued #{} key={} latency={}ms",
 +        id,
 +        value,
 +        record.key(),
 +        now - time
 +    );
 +
 +    return result;
 +  }
 +
++  @ExceptionHandler
++  @ResponseStatus(HttpStatus.BAD_REQUEST)
++  public ErrorResponse illegalStateException(IllegalStateException e)
++  {
++    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
++  }
++
 +  @PreDestroy
 +  public void destroy() throws ExecutionException, InterruptedException
 +  {
 +    log.info("{} - Destroy!", id);
 +    log.info("{} - Closing the KafkaProducer", id);
 +    producer.close();
 +    log.info("{}: Produced {} messages in total, exiting!", id, produced);
 +  }
 +}
@@@ -1,12 -1,13 +1,15 @@@
  producer:
    bootstrap-server: :9092
-   client-id: peter
+   client-id: DEV
    topic: test
 -  acks: 1
 -  throttle-ms: 1000
 +  acks: -1
 +  batch-size: 16384
 +  linger-ms: 0
 +  compression-type: gzip
  management:
+   endpoint:
+     shutdown:
+       enabled: true
    endpoints:
      web:
        exposure: