Merge des Upgrades der Confluent-Images auf 7.0.2
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestClient.java
1 package de.juplo.kafka;
2
3 import lombok.Getter;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.stereotype.Component;
6 import reactor.core.publisher.Mono;
7
8 import javax.annotation.PostConstruct;
9 import javax.annotation.PreDestroy;
10 import java.util.*;
11 import java.util.concurrent.*;
12 import java.util.concurrent.locks.Condition;
13 import java.util.concurrent.locks.Lock;
14 import java.util.concurrent.locks.ReentrantLock;
15 import java.util.function.Supplier;
16
17
18 @Component
19 @Slf4j
20 public class RestClient implements Callable<Long>
21 {
22   private final ExecutorService executor;
23   private final RestService service;
24
25   private final String username;
26   private final int throttleMs;
27   private long i = 0;
28
29   private boolean running = false;
30   private Future<Long> job;
31
32   private final Lock lock = new ReentrantLock();
33   private final Condition condition = lock.newCondition();
34   private final Set<Long> pending = new TreeSet<>();
35   private final Map<Integer, Long> offsets = new TreeMap<>();
36   private final List<RestFailure> failures = new LinkedList<>();
37
38   public RestClient(
39       ExecutorService executor,
40       RestService service,
41       ApplicationProperties properties)
42   {
43     this.executor = executor;
44     this.service = service;
45     this.username = properties.getUsername();
46     this.throttleMs = properties.getThrottleMs();
47   }
48
49
50   @Override
51   public Long call()
52   {
53     for(; running; i++)
54     {
55       Long message = i;
56       log.debug("{} - Sending message #{}", username, message);
57       guarded(() -> pending.add(message));
58       service
59           .send(message)
60           .doOnSubscribe(subscription -> guarded(() -> pending.add(message)))
61           .doOnTerminate(() -> guarded(() ->
62           {
63             pending.remove(message);
64             condition.signal();
65           }))
66           .onErrorResume(e ->
67               Mono.just(
68                   RestFailure
69                       .builder()
70                       .error("client-error")
71                       .exception(e.getMessage())
72                       .build()))
73           .subscribe(result ->
74           {
75             switch (result.getType())
76             {
77               case SUCCESS:
78                 RestSuccess success = (RestSuccess)result;
79                 log.info(
80                     "{} - Successfully sent message #{}: partition={}, offset={} ",
81                     username,
82                     message,
83                     success.partition,
84                     success.offset);
85                 guarded(() ->
86                 {
87                   Long offset = offsets.get(success.partition);
88                   if (offset == null || offset < success.offset)
89                     offsets.put(success.partition, success.offset);
90                 });
91                 break;
92
93               case FAILURE:
94                 RestFailure failure = (RestFailure)result;
95                 log.warn(
96                     "{} - Failure while sending message #{}: error={}, exception={}",
97                     username,
98                     message,
99                     failure.error,
100                     failure.exception);
101                 guarded(() -> failures.add(failure));
102                 break;
103             }
104           });
105
106       if (throttleMs > 0)
107       {
108         try
109         {
110           Thread.sleep(throttleMs);
111         }
112         catch (InterruptedException e)
113         {
114           log.warn("{} - Interrupted while throttling!", username, e);
115         }
116       }
117     }
118
119     return i;
120   }
121
122
123   public synchronized Status getStatus()
124   {
125     return new Status(username, running, pending, offsets, failures);
126   }
127
128
129   @Getter
130   public class Status
131   {
132     String username;
133     boolean running;
134     Set<Long> pending;
135     Map<Integer, Long> offsets;
136     List<RestFailure> failures;
137
138     private Status(
139         String username,
140         boolean running,
141         Set<Long> pending,
142         Map<Integer, Long> offsets,
143         List<RestFailure> failures)
144     {
145       this.username = username;
146       this.running = running;
147       guarded(() ->
148       {
149         this.pending = new LinkedHashSet<>(pending);
150         this.offsets = new LinkedHashMap<>(offsets);
151         this.failures = new ArrayList<>(failures);
152       });
153     }
154   }
155
156
157   @PostConstruct
158   public synchronized void start()
159   {
160     if (running)
161       throw new IllegalStateException("REST-client " + username + " is already running!");
162
163     log.info("{} - Starting - {} messages sent before", username, i);
164     running = true;
165     job = executor.submit(this);
166   }
167
168   public synchronized void stop() throws ExecutionException, InterruptedException
169   {
170     if (!running)
171       throw new IllegalStateException("REST-client " + username + " is not running!");
172
173     log.info("{} - Stopping...", username);
174     running = false;
175     Long sent = job.get();
176     log.info("{} - Stopped - sent {} messages so far", username, sent);
177   }
178
179   @PreDestroy
180   public synchronized void shutdown()
181   {
182     log.info("{} - Shutting down...", username);
183     try
184     {
185       stop();
186     }
187     catch (Exception e)
188     {
189       log.warn("{} - Exception while stopping", username, e);
190     }
191
192     guarded(() ->
193     {
194       while (!pending.isEmpty())
195       {
196         log.debug("{} - Waiting for {} outstanding responses...", username, pending.size());
197         try
198         {
199           condition.await();
200         }
201         catch (InterruptedException e)
202         {
203           log.warn("{} - Interrupted wail awaiting condtion!", username, e);
204         }
205       }
206     });
207     log.info("{} - Bye Bye", username);
208   }
209
210   private void guarded(Runnable function)
211   {
212     lock.lock();
213     try
214     {
215       function.run();
216     }
217     finally
218     {
219       lock.unlock();
220     }
221   }
222
223   private <T> T guarded(Supplier<T> function)
224   {
225     lock.lock();
226     try
227     {
228       return function.get();
229     }
230     finally
231     {
232       lock.unlock();
233     }
234   }
235 }