1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.admin.AdminClient;
11 import org.apache.kafka.clients.admin.MemberDescription;
12 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.ConsumerRecords;
15 import org.apache.kafka.clients.consumer.KafkaConsumer;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.errors.WakeupException;
18 import org.springframework.context.event.ContextRefreshedEvent;
19 import org.springframework.context.event.EventListener;
20 import org.springframework.web.bind.annotation.PostMapping;
21 import org.springframework.web.bind.annotation.RequestMapping;
22 import org.springframework.web.bind.annotation.ResponseBody;
24 import java.time.Duration;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.stream.Collectors;
32 @RequestMapping("/consumer")
35 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
37 private final String topic;
38 private final int numPartitions;
39 private final KafkaConsumer<String, String> consumer;
40 private final AdminClient adminClient;
41 private final TransferRepository repository;
42 private final ObjectMapper mapper;
43 private final ConsumerUseCases productionUseCases, restoreUseCases;
45 private boolean running = false;
46 private boolean shutdown = false;
47 private Future<?> future = null;
49 private final String groupId;
50 private final String groupInstanceId;
51 private final Map<String, String> instanceIdUriMapping;
52 private final String[] instanceIdByPartition;
54 private volatile boolean partitionOwnershipUnknown = true;
57 public TransferConsumer(
60 Map<String, String> instanceIdUriMapping,
61 KafkaConsumer<String, String> consumer,
62 AdminClient adminClient,
63 TransferRepository repository,
65 ConsumerUseCases productionUseCases,
66 ConsumerUseCases restoreUseCases)
69 this.numPartitions = numPartitions;
70 this.groupId = consumer.groupMetadata().groupId();
71 this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
72 this.instanceIdByPartition = new String[numPartitions];
73 this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
74 for (String instanceId : instanceIdUriMapping.keySet())
76 // Requests are not redirected for the instance itself
77 String uri = instanceId.equals(groupInstanceId)
79 : instanceIdUriMapping.get(instanceId);
80 this.instanceIdUriMapping.put(instanceId, uri);
82 this.consumer = consumer;
83 this.adminClient = adminClient;
84 this.repository = repository;
86 this.productionUseCases = productionUseCases;
87 this.restoreUseCases = restoreUseCases;
98 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
99 if (records.count() == 0)
102 log.debug("polled {} records", records.count());
103 records.forEach(record -> handleRecord(record, productionUseCases));
105 catch (WakeupException e)
107 log.info("cleanly interrupted while polling");
111 log.info("polling stopped");
114 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
118 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
122 case EventType.NEW_TRANSFER:
124 NewTransferEvent newTransferEvent =
125 mapper.readValue(record.value(), NewTransferEvent.class);
128 newTransferEvent.getId(),
129 newTransferEvent.getPayer(),
130 newTransferEvent.getPayee(),
131 newTransferEvent.getAmount());
134 case EventType.TRANSFER_STATE_CHANGED:
136 TransferStateChangedEvent stateChangedEvent =
137 mapper.readValue(record.value(), TransferStateChangedEvent.class);
138 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
142 catch (JsonProcessingException e)
145 "ignoring invalid json in message #{} on {}/{}: {}",
151 catch (IllegalArgumentException e)
154 "ignoring invalid message #{} on {}/{}: {}, message={}",
164 public Optional<String> uriForKey(String key)
168 while (partitionOwnershipUnknown)
170 try { wait(); } catch (InterruptedException e) {}
173 int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
176 .ofNullable(instanceIdByPartition[partition])
177 .map(id -> instanceIdUriMapping.get(id));
182 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
184 // "Needed", because this method is called synchronously during the
185 // initialization pahse of Spring. If the subscription happens
186 // in the same thread, it would block the completion of the initialization.
187 // Hence, the app would not react to any signal (CTRL-C, for example) except
188 // a KILL until the restoring is finished.
189 future = CompletableFuture.runAsync(() -> start());
190 log.info("start of application completed");
195 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
197 partitionOwnershipUnknown = true;
198 log.info("partitions revoked: {}", partitions);
202 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
204 log.info("partitions assigned: {}", partitions);
205 fetchAssignmentsAsync();
206 if (partitions.size() > 0)
210 private void fetchAssignmentsAsync()
213 .describeConsumerGroups(List.of(groupId))
216 .whenComplete((descriptions, e) ->
220 log.error("could not fetch group data: {}", e.getMessage());
226 for (MemberDescription description : descriptions.members())
231 .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
233 partitionOwnershipUnknown = false;
241 public void onPartitionsLost(Collection<TopicPartition> partitions)
243 partitionOwnershipUnknown = true;
244 log.info("partiotions lost: {}", partitions);
248 private void restore(Collection<TopicPartition> partitions)
250 log.info("--> starting restore...");
254 .map(topicPartition -> topicPartition.partition())
255 .forEach(partition -> repository.resetStorageForPartition(partition));
257 Map<Integer, Long> lastSeen =
259 .endOffsets(partitions)
262 .collect(Collectors.toMap(
263 entry -> entry.getKey().partition(),
264 entry -> entry.getValue() - 1));
266 Map<Integer, Long> positions =
270 .collect(Collectors.toMap(
271 partition -> partition,
278 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
279 .reduce(false, (a, b) -> a || b))
283 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
284 if (records.count() == 0)
287 log.debug("polled {} records", records.count());
288 records.forEach(record ->
290 handleRecord(record, restoreUseCases);
291 positions.put(record.partition(), record.offset());
294 catch(WakeupException e)
296 log.info("--> cleanly interrupted while restoring");
300 log.info("--> restore completed!");
303 @PostMapping("start")
304 public synchronized String start()
308 log.info("consumer already running!");
309 return "Already running!";
312 int foundNumPartitions = consumer.partitionsFor(topic).size();
313 if (foundNumPartitions != numPartitions)
316 "unexpected number of partitions for topic {}: expected={}, found={}",
321 return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
324 consumer.subscribe(List.of(topic), this);
327 future = CompletableFuture.runAsync(this);
329 log.info("consumer started");
334 public synchronized String stop()
338 log.info("consumer not running!");
339 return "Not running";
344 if (!future.isDone())
347 log.info("waiting for the consumer...");
352 catch (InterruptedException|ExecutionException e)
354 log.error("Exception while joining polling task!", e);
355 return e.getMessage();
360 consumer.unsubscribe();
363 log.info("consumer stopped");
367 public synchronized void shutdown()
369 log.info("shutdown initiated!");
372 log.info("closing consumer");
378 public interface ConsumerUseCases
381 CreateTransferUseCase,
382 HandleStateChangeUseCase {};