#!/bin/bash
-IMAGE=juplo/endless-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d producer
-sleep 5
-docker-compose stop producer
+docker-compose up -d
+
+sleep 15
+
+echo foo | http -v :8080/bar
+dd if=/dev/zero bs=1024 count=1024 | http -v :8080/fehler
+http -v :8081/seen
+
+docker-compose stop producer consumer
docker-compose logs producer
command: sleep infinity
producer:
- image: juplo/endless-producer:1.0-SNAPSHOT
+ image: juplo/rest-producer:1.0-SNAPSHOT
ports:
- 8080:8880
environment:
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer.throttle-ms: 200
+
+ consumer:
+ image: juplo/endless-consumer:1.0-SNAPSHOT
+ ports:
+ - 8081:8081
+ environment:
+ consumer.bootstrap-server: kafka:9092
+ consumer.client-id: my-group
+ consumer.client-id: consumer
+ consumer.topic: test
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>endless-producer</artifactId>
- <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
+ <artifactId>rest-producer</artifactId>
+ <name>REST Producer: a Simple Producer that takes messages via POST and confirms successs</name>
<version>1.0-SNAPSHOT</version>
<dependencies>
@EnableConfigurationProperties(ApplicationProperties.class)
public class Application
{
- @Autowired
- ApplicationProperties properties;
-
-
- @Bean
- public EndlessProducer producer()
- {
- Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set");
- Assert.hasText(properties.getClientId(), "producer.client-id must be set");
- Assert.hasText(properties.getTopic(), "producer.topic must be set");
-
- EndlessProducer producer =
- new EndlessProducer(
- Executors.newFixedThreadPool(1),
- properties.getBootstrapServer(),
- properties.getClientId(),
- properties.getTopic(),
- properties.getAcks(),
- properties.getThrottleMs());
-
- producer.start();
-
- return producer;
- }
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
private String clientId;
private String topic;
private String acks;
- private int throttleMs;
+ private Integer batchSize;
+ private Integer lingerMs;
+ private String compressionType;
}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.ResponseStatus;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.util.concurrent.ExecutionException;
-
-
-@RestController
-@RequiredArgsConstructor
-public class DriverController
-{
- private final EndlessProducer producer;
-
-
- @PostMapping("start")
- public void start()
- {
- producer.start();
- }
-
- @PostMapping("stop")
- public void stop() throws ExecutionException, InterruptedException
- {
- producer.stop();
- }
-
- @ExceptionHandler
- @ResponseStatus(HttpStatus.BAD_REQUEST)
- public ErrorResponse illegalStateException(IllegalStateException e)
- {
- return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import javax.annotation.PreDestroy;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-
-@Slf4j
-public class EndlessProducer implements Runnable
-{
- private final ExecutorService executor;
- private final String id;
- private final String topic;
- private final int throttleMs;
- private final KafkaProducer<String, String> producer;
-
- private boolean running = false;
- private long i = 0;
- private long produced = 0;
-
- public EndlessProducer(
- ExecutorService executor,
- String bootstrapServer,
- String clientId,
- String topic,
- String acks,
- int throttleMs)
- {
- this.executor = executor;
- this.id = clientId;
- this.topic = topic;
- this.throttleMs = throttleMs;
-
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServer);
- props.put("client.id", clientId);
- props.put("acks", acks);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- this.producer = new KafkaProducer<>(props);
- }
-
- @Override
- public void run()
- {
- try
- {
- for (; running; i++)
- {
- send(Long.toString(i%10), Long.toString(i));
-
- if (throttleMs > 0)
- {
- try
- {
- Thread.sleep(throttleMs);
- }
- catch (InterruptedException e)
- {
- log.warn("{} - Interrupted while throttling!", e);
- }
- }
- }
-
- log.info("{} - Done", id);
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected Exception:", id, e);
- }
- finally
- {
- synchronized (this)
- {
- running = false;
- log.info("{} - Stopped - produced {} messages so far", id, produced);
- }
- }
- }
-
- void send(String key, String value)
- {
- final long time = System.currentTimeMillis();
-
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- produced++;
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
-
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued #{} key={} latency={}ms",
- id,
- value,
- record.key(),
- now - time
- );
- }
-
- public synchronized void start()
- {
- if (running)
- throw new IllegalStateException("Producer instance " + id + " is already running!");
-
- log.info("{} - Starting - produced {} messages before", id, produced);
- running = true;
- executor.submit(this);
- }
-
- public synchronized void stop() throws ExecutionException, InterruptedException
- {
- if (!running)
- throw new IllegalStateException("Producer instance " + id + " is not running!");
-
- log.info("{} - Stopping...", id);
- running = false;
- }
-
- @PreDestroy
- public void destroy() throws ExecutionException, InterruptedException
- {
- log.info("{} - Destroy!", id);
- try
- {
- stop();
- }
- catch (IllegalStateException e)
- {
- log.info("{} - Was already stopped", id);
- }
- finally
- {
- log.info("{} - Closing the KafkaProducer", id);
- producer.close();
- log.info("{}: Produced {} messages in total, exiting!", id, produced);
- }
- }
-}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class ProduceFailure implements ProduceResult
+{
+ private final String error;
+ private final String exception;
+ private final Integer status;
+
+
+ public ProduceFailure(Exception e)
+ {
+ status = 500;
+ exception = e.getClass().getSimpleName();
+ error = e.getMessage();
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+
+
+@JsonInclude(NON_NULL)
+public interface ProduceResult
+{
+}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class ProduceSuccess implements ProduceResult
+{
+ Integer partition;
+ Long offset;
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.annotation.PreDestroy;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+
+@Slf4j
+@RestController
+public class RestProducer
+{
+ private final String id;
+ private final String topic;
+ private final KafkaProducer<String, String> producer;
+
+ private long produced = 0;
+
+ public RestProducer(ApplicationProperties properties)
+ {
+ this.id = properties.getClientId();
+ this.topic = properties.getTopic();
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("client.id", properties.getClientId());
+ props.put("acks", properties.getAcks());
+ props.put("batch.size", properties.getBatchSize());
+ props.put("delivery.timeout.ms", 20000); // 20 Sekunden
+ props.put("request.timeout.ms", 10000); // 10 Sekunden
+ props.put("linger.ms", properties.getLingerMs());
+ props.put("compression.type", properties.getCompressionType());
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ this.producer = new KafkaProducer<>(props);
+ }
+
+ @PostMapping(path = "{key}")
+ public DeferredResult<ProduceResult> send(
+ @PathVariable String key,
+ @RequestBody String value)
+ {
+ DeferredResult<ProduceResult> result = new DeferredResult<>();
+
+ final long time = System.currentTimeMillis();
+
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ topic, // Topic
+ key, // Key
+ value // Value
+ );
+
+ producer.send(record, (metadata, e) ->
+ {
+ long now = System.currentTimeMillis();
+ if (e == null)
+ {
+ // HANDLE SUCCESS
+ produced++;
+ result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ }
+ else
+ {
+ // HANDLE ERROR
+ result.setErrorResult(new ProduceFailure(e));
+ log.error(
+ "{} - ERROR key={} timestamp={} latency={}ms: {}",
+ id,
+ record.key(),
+ metadata == null ? -1 : metadata.timestamp(),
+ now - time,
+ e.toString()
+ );
+ }
+ });
+
+ long now = System.currentTimeMillis();
+ log.trace(
+ "{} - Queued #{} key={} latency={}ms",
+ id,
+ value,
+ record.key(),
+ now - time
+ );
+
+ return result;
+ }
+
+ @ExceptionHandler
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ public ErrorResponse illegalStateException(IllegalStateException e)
+ {
+ return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+ }
+
+ @PreDestroy
+ public void destroy() throws ExecutionException, InterruptedException
+ {
+ log.info("{} - Destroy!", id);
+ log.info("{} - Closing the KafkaProducer", id);
+ producer.close();
+ log.info("{}: Produced {} messages in total, exiting!", id, produced);
+ }
+}
bootstrap-server: :9092
client-id: DEV
topic: test
- acks: 1
- throttle-ms: 1000
+ acks: -1
+ batch-size: 16384
+ linger-ms: 0
+ compression-type: gzip
management:
endpoint:
shutdown: