#!/bin/bash
-IMAGE=juplo/endless-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-client:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d producer
-sleep 5
-docker-compose stop producer
-docker-compose logs producer
+
+docker-compose up -d
+
+sleep 15
+http :8081/status
+http :8000/seen
+
+http post :8081/stop
+http :8081/status
+http :8000/seen
+
+http post :8081/start
+
+sleep 1
+http :8081/status
+http :8000/seen
+sleep 1
+http :8081/status
+http :8000/seen
+sleep 1
+http :8081/status
+http :8000/seen
+sleep 1
+http :8081/status
+http :8000/seen
+sleep 1
+http :8081/status
+http :8000/seen
+
+http post :8081/stop
+http :8081/status
+
+http :8000/seen
+http post :8000/stop
+
+docker-compose logs client
command: sleep infinity
producer:
- image: juplo/endless-producer:1.0-SNAPSHOT
+ image: juplo/rest-producer:1.0-SNAPSHOT
ports:
- 8080:8080
environment:
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer.throttle-ms: 200
+
+ client:
+ image: juplo/rest-client:1.0-SNAPSHOT
+ ports:
+ - 8081:8081
+ environment:
+ rest-client.base-url: http://producer:8080
+
+ consumer:
+ image: juplo/counting-consumer:1.0-SNAPSHOT
+ ports:
+ - 8000:8081
+ environment:
+ consumer.bootstrap-server: kafka:9092
+ consumer.client-id: my-group
+ consumer.client-id: consumer
+ consumer.topic: test
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>endless-producer</artifactId>
- <name>Endless Producer: a Simple Producer that endlessly writes numbers into a topic</name>
+ <artifactId>rest-client</artifactId>
+ <name>REST Client: a simple client, that endlessly writes numbers via the REST-gateway</name>
<version>1.0-SNAPSHOT</version>
<dependencies>
<optional>true</optional>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
package de.juplo.kafka;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@EnableConfigurationProperties(ApplicationProperties.class)
public class Application
{
- @Autowired
- ApplicationProperties properties;
-
-
@Bean
- public EndlessProducer producer()
+ public ExecutorService executor()
{
- Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set");
- Assert.hasText(properties.getClientId(), "producer.client-id must be set");
- Assert.hasText(properties.getTopic(), "producer.topic must be set");
-
- EndlessProducer producer =
- new EndlessProducer(
- Executors.newFixedThreadPool(1),
- properties.getBootstrapServer(),
- properties.getClientId(),
- properties.getTopic(),
- properties.getAcks(),
- properties.getThrottleMs());
-
- producer.start();
-
- return producer;
+ return Executors.newFixedThreadPool(1);
}
+
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
-@ConfigurationProperties(prefix = "producer")
+@ConfigurationProperties(prefix = "rest-client")
@Getter
@Setter
public class ApplicationProperties
{
- private String bootstrapServer;
- private String clientId;
- private String topic;
- private String acks;
+ private String baseUrl;
+ private String username;
private int throttleMs;
}
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor
public class DriverController
{
- private final EndlessProducer producer;
+ private final RestClient client;
@PostMapping("start")
public void start()
{
- producer.start();
+ client.start();
}
@PostMapping("stop")
public void stop() throws ExecutionException, InterruptedException
{
- producer.stop();
+ client.stop();
+ }
+
+ @GetMapping("status")
+ public RestClient.Status getStatus()
+ {
+ return client.getStatus();
}
@ExceptionHandler
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import javax.annotation.PreDestroy;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-
-@Slf4j
-public class EndlessProducer implements Runnable
-{
- private final ExecutorService executor;
- private final String id;
- private final String topic;
- private final int throttleMs;
- private final KafkaProducer<String, String> producer;
-
- private boolean running = false;
- private long i = 0;
- private long produced = 0;
-
- public EndlessProducer(
- ExecutorService executor,
- String bootstrapServer,
- String clientId,
- String topic,
- String acks,
- int throttleMs)
- {
- this.executor = executor;
- this.id = clientId;
- this.topic = topic;
- this.throttleMs = throttleMs;
-
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServer);
- props.put("client.id", clientId);
- props.put("acks", acks);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- this.producer = new KafkaProducer<>(props);
- }
-
- @Override
- public void run()
- {
- try
- {
- for (; running; i++)
- {
- send(Long.toString(i%10), Long.toString(i));
-
- if (throttleMs > 0)
- {
- try
- {
- Thread.sleep(throttleMs);
- }
- catch (InterruptedException e)
- {
- log.warn("{} - Interrupted while throttling!", e);
- }
- }
- }
-
- log.info("{} - Done", id);
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected Exception:", id, e);
- }
- finally
- {
- synchronized (this)
- {
- running = false;
- log.info("{} - Stopped - produced {} messages so far", id, produced);
- }
- }
- }
-
- void send(String key, String value)
- {
- final long time = System.currentTimeMillis();
-
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- produced++;
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
-
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued #{} key={} latency={}ms",
- id,
- value,
- record.key(),
- now - time
- );
- }
-
- public synchronized void start()
- {
- if (running)
- throw new IllegalStateException("Producer instance " + id + " is already running!");
-
- log.info("{} - Starting - produced {} messages before", id, produced);
- running = true;
- executor.submit(this);
- }
-
- public synchronized void stop() throws ExecutionException, InterruptedException
- {
- if (!running)
- throw new IllegalStateException("Producer instance " + id + " is not running!");
-
- log.info("{} - Stopping...", id);
- running = false;
- }
-
- @PreDestroy
- public void destroy() throws ExecutionException, InterruptedException
- {
- log.info("{} - Destroy!", id);
- try
- {
- stop();
- }
- catch (IllegalStateException e)
- {
- log.info("{} - Was already stopped", id);
- }
- finally
- {
- log.info("{} - Closing the KafkaProducer", id);
- producer.close();
- log.info("{}: Produced {} messages in total, exiting!", id, produced);
- }
- }
-}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+
+@Component
+@Slf4j
+public class RestClient implements Callable<Long>
+{
+ private final ExecutorService executor;
+ private final RestService service;
+
+ private final String username;
+ private final int throttleMs;
+ private long i = 0;
+
+ private boolean running = false;
+ private Future<Long> job;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+ private final Set<Long> pending = new TreeSet<>();
+ private final Map<Integer, Long> offsets = new TreeMap<>();
+ private final List<RestFailure> failures = new LinkedList<>();
+
+ public RestClient(
+ ExecutorService executor,
+ RestService service,
+ ApplicationProperties properties)
+ {
+ this.executor = executor;
+ this.service = service;
+ this.username = properties.getUsername();
+ this.throttleMs = properties.getThrottleMs();
+ }
+
+
+ @Override
+ public Long call()
+ {
+ for(; running; i++)
+ {
+ Long message = i;
+ log.debug("{} - Sending message #{}", username, message);
+ guarded(() -> pending.add(message));
+ service
+ .send(message)
+ .doOnSubscribe(subscription -> guarded(() -> pending.add(message)))
+ .doOnTerminate(() -> guarded(() ->
+ {
+ pending.remove(message);
+ condition.signal();
+ }))
+ .onErrorResume(e ->
+ Mono.just(
+ RestFailure
+ .builder()
+ .error("client-error")
+ .exception(e.getMessage())
+ .build()))
+ .subscribe(result ->
+ {
+ switch (result.getType())
+ {
+ case SUCCESS:
+ RestSuccess success = (RestSuccess)result;
+ log.info(
+ "{} - Successfully sent message #{}: partition={}, offset={} ",
+ username,
+ message,
+ success.partition,
+ success.offset);
+ guarded(() ->
+ {
+ Long offset = offsets.get(success.partition);
+ if (offset == null || offset < success.offset)
+ offsets.put(success.partition, success.offset);
+ });
+ break;
+
+ case FAILURE:
+ RestFailure failure = (RestFailure)result;
+ log.warn(
+ "{} - Failure while sending message #{}: error={}, exception={}",
+ username,
+ message,
+ failure.error,
+ failure.exception);
+ guarded(() -> failures.add(failure));
+ break;
+ }
+ });
+
+ if (throttleMs > 0)
+ {
+ try
+ {
+ Thread.sleep(throttleMs);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted while throttling!", username, e);
+ }
+ }
+ }
+
+ return i;
+ }
+
+
+ public synchronized Status getStatus()
+ {
+ return new Status(running, pending, offsets, failures);
+ }
+
+
+ @Getter
+ public class Status
+ {
+ boolean running;
+ Set<Long> pending;
+ Map<Integer, Long> offsets;
+ List<RestFailure> failures;
+
+ private Status(
+ boolean running,
+ Set<Long> pending,
+ Map<Integer, Long> offsets,
+ List<RestFailure> failures)
+ {
+ this.running = running;
+ guarded(() ->
+ {
+ this.pending = new LinkedHashSet<>(pending);
+ this.offsets = new LinkedHashMap<>(offsets);
+ this.failures = new ArrayList<>(failures);
+ });
+ }
+ }
+
+
+ @PostConstruct
+ public synchronized void start()
+ {
+ if (running)
+ throw new IllegalStateException("REST-client " + username + " is already running!");
+
+ log.info("{} - Starting - {} messages sent before", username, i);
+ running = true;
+ job = executor.submit(this);
+ }
+
+ public synchronized void stop() throws ExecutionException, InterruptedException
+ {
+ if (!running)
+ throw new IllegalStateException("REST-client " + username + " is not running!");
+
+ log.info("{} - Stopping...", username);
+ running = false;
+ Long sent = job.get();
+ log.info("{} - Stopped - sent {} messages so far", username, sent);
+ }
+
+ @PreDestroy
+ public synchronized void shutdown()
+ {
+ log.info("{} - Shutting down...", username);
+ try
+ {
+ stop();
+ }
+ catch (Exception e)
+ {
+ log.warn("{} - Exception while stopping", username, e);
+ }
+
+ guarded(() ->
+ {
+ while (!pending.isEmpty())
+ {
+ log.debug("{} - Waiting for {} outstanding responses...", username, pending.size());
+ try
+ {
+ condition.await();
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted wail awaiting condtion!", username, e);
+ }
+ }
+ });
+ log.info("{} - Bye Bye", username);
+ }
+
+ private void guarded(Runnable function)
+ {
+ lock.lock();
+ try
+ {
+ function.run();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ private <T> T guarded(Supplier<T> function)
+ {
+ lock.lock();
+ try
+ {
+ return function.get();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Builder;
+import lombok.Data;
+
+
+@Data
+@Builder
+public class RestFailure implements RestResult
+{
+ String error;
+ String exception;
+ Integer status;
+
+
+ public Type getType()
+ {
+ return Type.FAILURE;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+public interface RestResult
+{
+ public static enum Type { SUCCESS, FAILURE };
+
+ public Type getType();
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+
+@Service
+public class RestService
+{
+ private final WebClient client;
+ private final String username;
+
+
+ public RestService(
+ WebClient.Builder builder,
+ ApplicationProperties properties)
+ {
+ this.client = builder
+ .baseUrl(properties.getBaseUrl())
+ .build();
+ this.username = properties.getUsername();
+ }
+
+
+ public Mono<RestResult> send(long number)
+ {
+ return client
+ .post()
+ .uri("/{username}", username)
+ .bodyValue(Long.toString(number))
+ .accept(MediaType.APPLICATION_JSON)
+ .exchangeToMono(response ->
+ response.statusCode().isError()
+ ? response.bodyToMono(RestFailure.class)
+ : response.bodyToMono(RestSuccess.class));
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class RestSuccess implements RestResult
+{
+ Integer partition;
+ Long offset;
+
+
+ public Type getType()
+ {
+ return Type.SUCCESS;
+ }
+}
-producer:
- bootstrap-server: :9092
- client-id: peter
- topic: test
- acks: 1
+server:
+ port: 8081
+rest-client:
+ base-url: http://localhost:8080
+ username: rest-client
throttle-ms: 1000
management:
endpoints: