1 package de.juplo.kafka;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.stereotype.Component;
6 import reactor.core.publisher.Mono;
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
11 import java.util.concurrent.*;
12 import java.util.concurrent.locks.Condition;
13 import java.util.concurrent.locks.Lock;
14 import java.util.concurrent.locks.ReentrantLock;
15 import java.util.function.Supplier;
20 public class RestClient implements Callable<Long>
22 private final ExecutorService executor;
23 private final RestService service;
25 private final String username;
26 private final int throttleMs;
29 private boolean running = false;
30 private Future<Long> job;
32 private final Lock lock = new ReentrantLock();
33 private final Condition condition = lock.newCondition();
34 private final Set<Long> pending = new TreeSet<>();
35 private final Map<Integer, Long> offsets = new TreeMap<>();
36 private final List<RestFailure> failures = new LinkedList<>();
39 ExecutorService executor,
41 ApplicationProperties properties)
43 this.executor = executor;
44 this.service = service;
45 this.username = properties.getUsername();
46 this.throttleMs = properties.getThrottleMs();
56 log.debug("{} - Sending message #{}", username, message);
57 guarded(() -> pending.add(message));
60 .doOnSubscribe(subscription -> guarded(() -> pending.add(message)))
61 .doOnTerminate(() -> guarded(() ->
63 pending.remove(message);
70 .error("client-error")
71 .exception(e.getMessage())
75 switch (result.getType())
78 RestSuccess success = (RestSuccess)result;
80 "{} - Successfully sent message #{}: partition={}, offset={} ",
87 Long offset = offsets.get(success.partition);
88 if (offset == null || offset < success.offset)
89 offsets.put(success.partition, success.offset);
94 RestFailure failure = (RestFailure)result;
96 "{} - Failure while sending message #{}: error={}, exception={}",
101 guarded(() -> failures.add(failure));
110 Thread.sleep(throttleMs);
112 catch (InterruptedException e)
114 log.warn("{} - Interrupted while throttling!", username, e);
123 public synchronized Status getStatus()
125 return new Status(running, pending, offsets, failures);
134 Map<Integer, Long> offsets;
135 List<RestFailure> failures;
140 Map<Integer, Long> offsets,
141 List<RestFailure> failures)
143 this.running = running;
146 this.pending = new LinkedHashSet<>(pending);
147 this.offsets = new LinkedHashMap<>(offsets);
148 this.failures = new ArrayList<>(failures);
155 public synchronized void start()
158 throw new IllegalStateException("REST-client " + username + " is already running!");
160 log.info("{} - Starting - {} messages sent before", username, i);
162 job = executor.submit(this);
165 public synchronized void stop() throws ExecutionException, InterruptedException
168 throw new IllegalStateException("REST-client " + username + " is not running!");
170 log.info("{} - Stopping...", username);
172 Long sent = job.get();
173 log.info("{} - Stopped - sent {} messages so far", username, sent);
177 public synchronized void shutdown()
179 log.info("{} - Shutting down...", username);
186 log.warn("{} - Exception while stopping", username, e);
191 while (!pending.isEmpty())
193 log.debug("{} - Waiting for {} outstanding responses...", username, pending.size());
198 catch (InterruptedException e)
200 log.warn("{} - Interrupted wail awaiting condtion!", username, e);
204 log.info("{} - Bye Bye", username);
207 private void guarded(Runnable function)
220 private <T> T guarded(Supplier<T> function)
225 return function.get();