1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.admin.AdminClient;
11 import org.apache.kafka.clients.admin.MemberDescription;
12 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.ConsumerRecords;
15 import org.apache.kafka.clients.consumer.KafkaConsumer;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.errors.WakeupException;
18 import org.springframework.context.event.ContextRefreshedEvent;
19 import org.springframework.context.event.EventListener;
20 import org.springframework.web.bind.annotation.PostMapping;
21 import org.springframework.web.bind.annotation.RequestMapping;
22 import org.springframework.web.bind.annotation.ResponseBody;
24 import java.time.Duration;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.stream.Collectors;
32 @RequestMapping("/consumer")
35 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
37 private final String topic;
38 private final int numPartitions;
39 private final KafkaConsumer<String, String> consumer;
40 private final AdminClient adminClient;
41 private final TransferRepository repository;
42 private final ObjectMapper mapper;
43 private final ConsumerUseCases productionUseCases, restoreUseCases;
45 private boolean running = false;
46 private boolean shutdown = false;
47 private Future<?> future = null;
49 private final String groupId;
50 private final String groupInstanceId;
51 private final Map<String, String> instanceIdUriMapping;
52 private final String[] instanceIdByPartition;
54 private volatile boolean partitionOwnershipUnknown = true;
57 public TransferConsumer(
60 Map<String, String> instanceIdUriMapping,
61 KafkaConsumer<String, String> consumer,
62 AdminClient adminClient,
63 TransferRepository repository,
65 ConsumerUseCases productionUseCases,
66 ConsumerUseCases restoreUseCases)
69 this.numPartitions = numPartitions;
70 this.groupId = consumer.groupMetadata().groupId();
71 this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
72 this.instanceIdByPartition = new String[numPartitions];
73 this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
74 for (String instanceId : instanceIdUriMapping.keySet())
76 // Requests are not redirected for the instance itself
77 String uri = instanceId.equals(groupInstanceId)
79 : instanceIdUriMapping.get(instanceId);
80 this.instanceIdUriMapping.put(instanceId, uri);
82 this.consumer = consumer;
83 this.adminClient = adminClient;
84 this.repository = repository;
86 this.productionUseCases = productionUseCases;
87 this.restoreUseCases = restoreUseCases;
98 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
99 if (records.count() == 0)
102 log.debug("polled {} records", records.count());
103 records.forEach(record -> handleRecord(record, productionUseCases));
105 catch (WakeupException e)
107 log.info("cleanly interrupted while polling");
111 log.info("polling stopped");
114 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
118 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
122 case EventType.NEW_TRANSFER:
124 NewTransferEvent newTransferEvent =
125 mapper.readValue(record.value(), NewTransferEvent.class);
128 newTransferEvent.getId(),
129 newTransferEvent.getPayer(),
130 newTransferEvent.getPayee(),
131 newTransferEvent.getAmount());
134 case EventType.TRANSFER_STATE_CHANGED:
136 TransferStateChangedEvent stateChangedEvent =
137 mapper.readValue(record.value(), TransferStateChangedEvent.class);
138 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
142 catch (JsonProcessingException e)
145 "ignoring invalid json in message #{} on {}/{}: {}",
151 catch (IllegalArgumentException e)
154 "ignoring invalid message #{} on {}/{}: {}, message={}",
164 public Optional<String> uriForKey(String key)
168 while (partitionOwnershipUnknown)
170 try { wait(); } catch (InterruptedException e) {}
173 int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
176 .ofNullable(instanceIdByPartition[partition])
177 .map(id -> instanceIdUriMapping.get(id));
182 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
184 // "Needed", because this method is called synchronously during the
185 // initialization pahse of Spring. If the subscription happens
186 // in the same thread, it would block the completion of the initialization.
187 // Hence, the app would not react to any signal (CTRL-C, for example) except
188 // a KILL until the restoring is finished.
189 future = CompletableFuture.runAsync(() -> start());
194 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
196 partitionOwnershipUnknown = true;
197 log.info("partitions revoked: {}", partitions);
201 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
203 log.info("partitions assigned: {}", partitions);
204 fetchAssignmentsAsync();
205 if (partitions.size() > 0)
209 private void fetchAssignmentsAsync()
212 .describeConsumerGroups(List.of(groupId))
215 .whenComplete((descriptions, e) ->
219 log.error("could not fetch group data: {}", e.getMessage());
225 for (MemberDescription description : descriptions.members())
230 .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
232 partitionOwnershipUnknown = false;
240 public void onPartitionsLost(Collection<TopicPartition> partitions)
242 partitionOwnershipUnknown = true;
243 log.info("partiotions lost: {}", partitions);
247 private void restore(Collection<TopicPartition> partitions)
249 log.info("--> starting restore...");
253 .map(topicPartition -> topicPartition.partition())
254 .forEach(partition -> repository.resetStorageForPartition(partition));
256 Map<Integer, Long> lastSeen =
258 .endOffsets(partitions)
261 .collect(Collectors.toMap(
262 entry -> entry.getKey().partition(),
263 entry -> entry.getValue() - 1));
265 Map<Integer, Long> positions =
269 .collect(Collectors.toMap(
270 partition -> partition,
277 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
278 .reduce(false, (a, b) -> a || b))
282 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
283 if (records.count() == 0)
286 log.debug("polled {} records", records.count());
287 records.forEach(record ->
289 handleRecord(record, restoreUseCases);
290 positions.put(record.partition(), record.offset());
293 catch(WakeupException e)
295 log.info("--> cleanly interrupted while restoring");
299 log.info("--> restore completed!");
302 @PostMapping("start")
303 public synchronized String start()
307 log.info("already running!");
308 return "Already running!";
311 int foundNumPartitions = consumer.partitionsFor(topic).size();
312 if (foundNumPartitions != numPartitions)
315 "unexpected number of partitions for topic {}: expected={}, found={}",
320 return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
323 consumer.subscribe(List.of(topic), this);
326 future = CompletableFuture.runAsync(this);
333 public synchronized String stop()
337 log.info("not running!");
338 return "Not running";
343 if (!future.isDone())
346 log.info("waiting for the consumer...");
351 catch (InterruptedException|ExecutionException e)
353 log.error("Exception while joining polling task!", e);
354 return e.getMessage();
359 consumer.unsubscribe();
366 public synchronized void shutdown()
368 log.info("shutdown initiated!");
371 log.info("closing consumer");
377 public interface ConsumerUseCases
380 CreateTransferUseCase,
381 HandleStateChangeUseCase {};