Merge branch 'endless-stream-producer' into rest-producer
authorKai Moritz <kai@juplo.de>
Tue, 31 May 2022 03:39:30 +0000 (05:39 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 31 May 2022 03:39:30 +0000 (05:39 +0200)
1  2 
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml

diff --combined docker-compose.yml
@@@ -37,20 -37,11 +37,20 @@@ services
      command: sleep infinity
  
    producer:
 -    image: juplo/endless-producer:1.0-SNAPSHOT
 +    image: juplo/rest-producer:1.0-SNAPSHOT
      ports:
-       - 8080:8080
+       - 8080:8880
      environment:
        producer.bootstrap-server: kafka:9092
        producer.client-id: producer
        producer.topic: test
 -      producer.throttle-ms: 200
 +
 +  consumer:
 +    image: juplo/endless-consumer:1.0-SNAPSHOT
 +    ports:
 +      - 8081:8081
 +    environment:
 +      consumer.bootstrap-server: kafka:9092
 +      consumer.client-id: my-group
 +      consumer.client-id: consumer
 +      consumer.topic: test
diff --combined pom.xml
+++ b/pom.xml
@@@ -12,8 -12,8 +12,8 @@@
    </parent>
  
    <groupId>de.juplo.kafka</groupId>
 -  <artifactId>endless-producer</artifactId>
 -  <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
 +  <artifactId>rest-producer</artifactId>
 +  <name>REST Producer: a Simple Producer that takes messages via POST and confirms successs</name>
    <version>1.0-SNAPSHOT</version>
  
    <dependencies>
        <plugin>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-maven-plugin</artifactId>
+         <executions>
+           <execution>
+             <goals>
+               <goal>build-info</goal>
+             </goals>
+           </execution>
+         </executions>
+       </plugin>
+       <plugin>
+         <groupId>pl.project13.maven</groupId>
+         <artifactId>git-commit-id-plugin</artifactId>
        </plugin>
        <plugin>
          <groupId>io.fabric8</groupId>
index dea49f0,0000000..7d9bf12
mode 100644,000000..100644
--- /dev/null
@@@ -1,116 -1,0 +1,124 @@@
 +package de.juplo.kafka;
 +
 +import lombok.extern.slf4j.Slf4j;
 +import org.apache.kafka.clients.producer.KafkaProducer;
 +import org.apache.kafka.clients.producer.ProducerRecord;
 +import org.apache.kafka.common.serialization.StringSerializer;
++import org.springframework.http.HttpStatus;
 +import org.springframework.http.MediaType;
 +import org.springframework.web.bind.annotation.*;
 +import org.springframework.web.context.request.async.DeferredResult;
 +
 +import javax.annotation.PreDestroy;
 +import java.util.Properties;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +
 +
 +@Slf4j
 +@RestController
 +public class RestProducer
 +{
 +  private final String id;
 +  private final String topic;
 +  private final KafkaProducer<String, String> producer;
 +
 +  private long produced = 0;
 +
 +  public RestProducer(ApplicationProperties properties)
 +  {
 +    this.id = properties.getClientId();
 +    this.topic = properties.getTopic();
 +
 +    Properties props = new Properties();
 +    props.put("bootstrap.servers", properties.getBootstrapServer());
 +    props.put("client.id", properties.getClientId());
 +    props.put("acks", properties.getAcks());
 +    props.put("batch.size", properties.getBatchSize());
 +    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
 +    props.put("request.timeout.ms",  10000); // 10 Sekunden
 +    props.put("linger.ms", properties.getLingerMs());
 +    props.put("compression.type", properties.getCompressionType());
 +    props.put("key.serializer", StringSerializer.class.getName());
 +    props.put("value.serializer", StringSerializer.class.getName());
 +
 +    this.producer = new KafkaProducer<>(props);
 +  }
 +
 +  @PostMapping(path = "{key}")
 +  public DeferredResult<ProduceResult> send(
 +      @PathVariable String key,
 +      @RequestBody String value)
 +  {
 +    DeferredResult<ProduceResult> result = new DeferredResult<>();
 +
 +    final long time = System.currentTimeMillis();
 +
 +    final ProducerRecord<String, String> record = new ProducerRecord<>(
 +        topic,  // Topic
 +        key,    // Key
 +        value   // Value
 +    );
 +
 +    producer.send(record, (metadata, e) ->
 +    {
 +      long now = System.currentTimeMillis();
 +      if (e == null)
 +      {
 +        // HANDLE SUCCESS
 +        produced++;
 +        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
 +        log.debug(
 +            "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
 +            id,
 +            record.key(),
 +            record.value(),
 +            metadata.partition(),
 +            metadata.offset(),
 +            metadata.timestamp(),
 +            now - time
 +        );
 +      }
 +      else
 +      {
 +        // HANDLE ERROR
 +        result.setErrorResult(new ProduceFailure(e));
 +        log.error(
 +            "{} - ERROR key={} timestamp={} latency={}ms: {}",
 +            id,
 +            record.key(),
 +            metadata == null ? -1 : metadata.timestamp(),
 +            now - time,
 +            e.toString()
 +        );
 +      }
 +    });
 +
 +    long now = System.currentTimeMillis();
 +    log.trace(
 +        "{} - Queued #{} key={} latency={}ms",
 +        id,
 +        value,
 +        record.key(),
 +        now - time
 +    );
 +
 +    return result;
 +  }
 +
++  @ExceptionHandler
++  @ResponseStatus(HttpStatus.BAD_REQUEST)
++  public ErrorResponse illegalStateException(IllegalStateException e)
++  {
++    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
++  }
++
 +  @PreDestroy
 +  public void destroy() throws ExecutionException, InterruptedException
 +  {
 +    log.info("{} - Destroy!", id);
 +    log.info("{} - Closing the KafkaProducer", id);
 +    producer.close();
 +    log.info("{}: Produced {} messages in total, exiting!", id, produced);
 +  }
 +}
@@@ -1,17 -1,32 +1,34 @@@
  producer:
    bootstrap-server: :9092
-   client-id: peter
+   client-id: DEV
    topic: test
 -  acks: 1
 -  throttle-ms: 1000
 +  acks: -1
 +  batch-size: 16384
 +  linger-ms: 0
 +  compression-type: gzip
  management:
+   endpoint:
+     shutdown:
+       enabled: true
    endpoints:
      web:
        exposure:
          include: "*"
+   info:
+     env:
+       enabled: true
+     java:
+       enabled: true
+ info:
+   kafka:
+     bootstrap-server: ${producer.bootstrap-server}
+     client-id: ${producer.client-id}
+     topic: ${producer.topic}
+     acks: ${producer.acks}
+     throttle-ms: ${producer.throttle-ms}
  logging:
    level:
      root: INFO
      de.juplo: DEBUG
+ server:
+   port: 8880