1 package de.juplo.kafka;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.stereotype.Component;
6 import reactor.core.publisher.Mono;
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
11 import java.util.concurrent.*;
12 import java.util.concurrent.locks.Condition;
13 import java.util.concurrent.locks.Lock;
14 import java.util.concurrent.locks.ReentrantLock;
15 import java.util.function.Supplier;
20 public class RestClient implements Callable<Long>
22 private final ExecutorService executor;
23 private final RestService service;
25 private final String username;
26 private final int throttleMs;
29 private boolean running = false;
30 private Future<Long> job;
32 private final Lock lock = new ReentrantLock();
33 private final Condition condition = lock.newCondition();
34 private final Set<Long> pending = new TreeSet<>();
35 private final Map<Integer, Long> offsets = new TreeMap<>();
36 private final List<RestFailure> failures = new LinkedList<>();
39 ExecutorService executor,
41 ApplicationProperties properties)
43 this.executor = executor;
44 this.service = service;
45 this.username = properties.getUsername();
46 this.throttleMs = properties.getThrottleMs();
56 log.debug("{} - Sending message #{}", username, message);
57 guarded(() -> pending.add(message));
60 .doOnSubscribe(subscription -> guarded(() -> pending.add(message)))
61 .doOnTerminate(() -> guarded(() ->
63 pending.remove(message);
70 .error("client-error")
71 .exception(e.getMessage())
75 switch (result.getType())
78 RestSuccess success = (RestSuccess)result;
80 "{} - Successfully sent message #{}: partition={}, offset={} ",
87 Long offset = offsets.get(success.partition);
88 if (offset == null || offset < success.offset)
89 offsets.put(success.partition, success.offset);
94 RestFailure failure = (RestFailure)result;
96 "{} - Failure while sending message #{}: error={}, exception={}",
101 guarded(() -> failures.add(failure));
110 Thread.sleep(throttleMs);
112 catch (InterruptedException e)
114 log.warn("{} - Interrupted while throttling!", username, e);
123 public synchronized Status getStatus()
125 return new Status(username, running, pending, offsets, failures);
135 Map<Integer, Long> offsets;
136 List<RestFailure> failures;
142 Map<Integer, Long> offsets,
143 List<RestFailure> failures)
145 this.username = username;
146 this.running = running;
149 this.pending = new LinkedHashSet<>(pending);
150 this.offsets = new LinkedHashMap<>(offsets);
151 this.failures = new ArrayList<>(failures);
158 public synchronized void start()
161 throw new IllegalStateException("REST-client " + username + " is already running!");
163 log.info("{} - Starting - {} messages sent before", username, i);
165 job = executor.submit(this);
168 public synchronized void stop() throws ExecutionException, InterruptedException
171 throw new IllegalStateException("REST-client " + username + " is not running!");
173 log.info("{} - Stopping...", username);
175 Long sent = job.get();
176 log.info("{} - Stopped - sent {} messages so far", username, sent);
180 public synchronized void shutdown()
182 log.info("{} - Shutting down...", username);
189 log.warn("{} - Exception while stopping", username, e);
194 while (!pending.isEmpty())
196 log.debug("{} - Waiting for {} outstanding responses...", username, pending.size());
201 catch (InterruptedException e)
203 log.warn("{} - Interrupted wail awaiting condtion!", username, e);
207 log.info("{} - Bye Bye", username);
210 private void guarded(Runnable function)
223 private <T> T guarded(Supplier<T> function)
228 return function.get();