+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+
+@Component
+@Slf4j
+public class RestClient implements Callable<Long>
+{
+ private final ExecutorService executor;
+ private final RestService service;
+
+ private final String username;
+ private final int throttleMs;
+ private long i = 0;
+
+ private boolean running = false;
+ private Future<Long> job;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+ private final Set<Long> pending = new TreeSet<>();
+ private final Map<Integer, Long> offsets = new TreeMap<>();
+ private final List<RestFailure> failures = new LinkedList<>();
+
+ public RestClient(
+ ExecutorService executor,
+ RestService service,
+ ApplicationProperties properties)
+ {
+ this.executor = executor;
+ this.service = service;
+ this.username = properties.getUsername();
+ this.throttleMs = properties.getThrottleMs();
+ }
+
+
+ @Override
+ public Long call()
+ {
+ for(; running; i++)
+ {
+ Long message = i;
+ log.debug("{} - Sending message #{}", username, message);
+ guarded(() -> pending.add(message));
+ service
+ .send(message)
+ .doOnSubscribe(subscription -> guarded(() -> pending.add(message)))
+ .doOnTerminate(() -> guarded(() ->
+ {
+ pending.remove(message);
+ condition.signal();
+ }))
+ .onErrorResume(e ->
+ Mono.just(
+ RestFailure
+ .builder()
+ .error("client-error")
+ .exception(e.getMessage())
+ .build()))
+ .subscribe(result ->
+ {
+ switch (result.getType())
+ {
+ case SUCCESS:
+ RestSuccess success = (RestSuccess)result;
+ log.info(
+ "{} - Successfully sent message #{}: partition={}, offset={} ",
+ username,
+ message,
+ success.partition,
+ success.offset);
+ guarded(() ->
+ {
+ Long offset = offsets.get(success.partition);
+ if (offset == null || offset < success.offset)
+ offsets.put(success.partition, success.offset);
+ });
+ break;
+
+ case FAILURE:
+ RestFailure failure = (RestFailure)result;
+ log.warn(
+ "{} - Failure while sending message #{}: error={}, exception={}",
+ username,
+ message,
+ failure.error,
+ failure.exception);
+ guarded(() -> failures.add(failure));
+ break;
+ }
+ });
+
+ if (throttleMs > 0)
+ {
+ try
+ {
+ Thread.sleep(throttleMs);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted while throttling!", username, e);
+ }
+ }
+ }
+
+ return i;
+ }
+
+
+ public synchronized Status getStatus()
+ {
+ return new Status(running, pending, offsets, failures);
+ }
+
+
+ @Getter
+ public class Status
+ {
+ boolean running;
+ Set<Long> pending;
+ Map<Integer, Long> offsets;
+ List<RestFailure> failures;
+
+ private Status(
+ boolean running,
+ Set<Long> pending,
+ Map<Integer, Long> offsets,
+ List<RestFailure> failures)
+ {
+ this.running = running;
+ guarded(() ->
+ {
+ this.pending = new LinkedHashSet<>(pending);
+ this.offsets = new LinkedHashMap<>(offsets);
+ this.failures = new ArrayList<>(failures);
+ });
+ }
+ }
+
+
+ @PostConstruct
+ public synchronized void start()
+ {
+ if (running)
+ throw new IllegalStateException("REST-client " + username + " is already running!");
+
+ log.info("{} - Starting - {} messages sent before", username, i);
+ running = true;
+ job = executor.submit(this);
+ }
+
+ public synchronized void stop() throws ExecutionException, InterruptedException
+ {
+ if (!running)
+ throw new IllegalStateException("REST-client " + username + " is not running!");
+
+ log.info("{} - Stopping...", username);
+ running = false;
+ Long sent = job.get();
+ log.info("{} - Stopped - sent {} messages so far", username, sent);
+ }
+
+ @PreDestroy
+ public synchronized void shutdown()
+ {
+ log.info("{} - Shutting down...", username);
+ try
+ {
+ stop();
+ }
+ catch (Exception e)
+ {
+ log.warn("{} - Exception while stopping", username, e);
+ }
+
+ guarded(() ->
+ {
+ while (!pending.isEmpty())
+ {
+ log.debug("{} - Waiting for {} outstanding responses...", username, pending.size());
+ try
+ {
+ condition.await();
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted wail awaiting condtion!", username, e);
+ }
+ }
+ });
+ log.info("{} - Bye Bye", username);
+ }
+
+ private void guarded(Runnable function)
+ {
+ lock.lock();
+ try
+ {
+ function.run();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ private <T> T guarded(Supplier<T> function)
+ {
+ lock.lock();
+ try
+ {
+ return function.get();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+}