689d672727ea9ef4b4e2048574b06f4af988f654
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestClient.java
1 package de.juplo.kafka;
2
3 import lombok.Getter;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.stereotype.Component;
6 import reactor.core.publisher.Mono;
7
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
10 import java.util.*;
11 import java.util.concurrent.*;
12 import java.util.concurrent.locks.Condition;
13 import java.util.concurrent.locks.Lock;
14 import java.util.concurrent.locks.ReentrantLock;
15 import java.util.function.Supplier;
16
17
18 @Component
19 @Slf4j
20 public class RestClient implements Callable<Long>
21 {
22   private final ExecutorService executor;
23   private final RestService service;
24
25   private final String username;
26   private final int throttleMs;
27   private long i = 0;
28
29   private boolean running = false;
30   private Future<Long> job;
31
32   private final Lock lock = new ReentrantLock();
33   private final Condition condition = lock.newCondition();
34   private final Set<Long> pending = new TreeSet<>();
35   private final Map<Integer, Long> offsets = new TreeMap<>();
36   private final List<RestFailure> failures = new LinkedList<>();
37
38   public RestClient(
39       ExecutorService executor,
40       RestService service,
41       ApplicationProperties properties)
42   {
43     this.executor = executor;
44     this.service = service;
45     this.username = properties.getUsername();
46     this.throttleMs = properties.getThrottleMs();
47   }
48
49
50   @Override
51   public Long call()
52   {
53     for(; running; i++)
54     {
55       Long message = i;
56       log.debug("{} - Sending message #{}", username, message);
57       guarded(() -> pending.add(message));
58       service
59           .send(message)
60           .doOnSubscribe(subscription -> guarded(() -> pending.add(message)))
61           .doOnTerminate(() -> guarded(() ->
62           {
63             pending.remove(message);
64             condition.signal();
65           }))
66           .onErrorResume(e ->
67               Mono.just(
68                   RestFailure
69                       .builder()
70                       .error("client-error")
71                       .exception(e.getMessage())
72                       .build()))
73           .subscribe(result ->
74           {
75             switch (result.getType())
76             {
77               case SUCCESS:
78                 RestSuccess success = (RestSuccess)result;
79                 log.info(
80                     "{} - Successfully sent message #{}: partition={}, offset={} ",
81                     username,
82                     message,
83                     success.partition,
84                     success.offset);
85                 guarded(() ->
86                 {
87                   Long offset = offsets.get(success.partition);
88                   if (offset == null || offset < success.offset)
89                     offsets.put(success.partition, success.offset);
90                 });
91                 break;
92
93               case FAILURE:
94                 RestFailure failure = (RestFailure)result;
95                 log.warn(
96                     "{} - Failure while sending message #{}: error={}, exception={}",
97                     username,
98                     message,
99                     failure.error,
100                     failure.exception);
101                 guarded(() -> failures.add(failure));
102                 break;
103             }
104           });
105
106       if (throttleMs > 0)
107       {
108         try
109         {
110           Thread.sleep(throttleMs);
111         }
112         catch (InterruptedException e)
113         {
114           log.warn("{} - Interrupted while throttling!", username, e);
115         }
116       }
117     }
118
119     return i;
120   }
121
122
123   public synchronized Status getStatus()
124   {
125     return new Status(running, pending, offsets, failures);
126   }
127
128
129   @Getter
130   public class Status
131   {
132     boolean running;
133     Set<Long> pending;
134     Map<Integer, Long> offsets;
135     List<RestFailure> failures;
136
137     private Status(
138         boolean running,
139         Set<Long> pending,
140         Map<Integer, Long> offsets,
141         List<RestFailure> failures)
142     {
143       this.running = running;
144       guarded(() ->
145       {
146         this.pending = new LinkedHashSet<>(pending);
147         this.offsets = new LinkedHashMap<>(offsets);
148         this.failures = new ArrayList<>(failures);
149       });
150     }
151   }
152
153
154   @PostConstruct
155   public synchronized void start()
156   {
157     if (running)
158       throw new IllegalStateException("REST-client " + username + " is already running!");
159
160     log.info("{} - Starting - {} messages sent before", username, i);
161     running = true;
162     job = executor.submit(this);
163   }
164
165   public synchronized void stop() throws ExecutionException, InterruptedException
166   {
167     if (!running)
168       throw new IllegalStateException("REST-client " + username + " is not running!");
169
170     log.info("{} - Stopping...", username);
171     running = false;
172     Long sent = job.get();
173     log.info("{} - Stopped - sent {} messages so far", username, sent);
174   }
175
176   @PreDestroy
177   public synchronized void shutdown()
178   {
179     log.info("{} - Shutting down...", username);
180     try
181     {
182       stop();
183     }
184     catch (Exception e)
185     {
186       log.warn("{} - Exception while stopping", username, e);
187     }
188
189     guarded(() ->
190     {
191       while (!pending.isEmpty())
192       {
193         log.debug("{} - Waiting for {} outstanding responses...", username, pending.size());
194         try
195         {
196           condition.await();
197         }
198         catch (InterruptedException e)
199         {
200           log.warn("{} - Interrupted wail awaiting condtion!", username, e);
201         }
202       }
203     });
204     log.info("{} - Bye Bye", username);
205   }
206
207   private void guarded(Runnable function)
208   {
209     lock.lock();
210     try
211     {
212       function.run();
213     }
214     finally
215     {
216       lock.unlock();
217     }
218   }
219
220   private <T> T guarded(Supplier<T> function)
221   {
222     lock.lock();
223     try
224     {
225       return function.get();
226     }
227     finally
228     {
229       lock.unlock();
230     }
231   }
232 }