1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.admin.AdminClient;
12 import org.apache.kafka.clients.admin.MemberDescription;
13 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
14 import org.apache.kafka.clients.consumer.ConsumerRecord;
15 import org.apache.kafka.clients.consumer.ConsumerRecords;
16 import org.apache.kafka.clients.consumer.KafkaConsumer;
17 import org.apache.kafka.common.TopicPartition;
18 import org.apache.kafka.common.errors.WakeupException;
19 import org.springframework.context.event.ContextRefreshedEvent;
20 import org.springframework.context.event.EventListener;
21 import org.springframework.web.bind.annotation.PostMapping;
22 import org.springframework.web.bind.annotation.RequestMapping;
23 import org.springframework.web.bind.annotation.ResponseBody;
25 import java.time.Clock;
26 import java.time.Duration;
27 import java.time.Instant;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Future;
32 import java.util.function.Consumer;
35 @RequestMapping("/consumer")
38 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
40 private final String topic;
41 private final int numPartitions;
42 private final KafkaConsumer<String, String> consumer;
43 private final AdminClient adminClient;
44 private final TransferRepository repository;
45 private final ObjectMapper mapper;
46 private final ConsumerUseCases restoreUseCases;
48 private boolean running = false;
49 private boolean shutdown = false;
50 private Future<?> future = null;
52 private final String groupId;
53 private final String groupInstanceId;
54 private final Map<String, String> instanceIdUriMapping;
55 private final String[] instanceIdByPartition;
59 private int stateStoreInterval;
61 private final Consumer<ConsumerRecord<String, String>> productionRecordHandler;
62 private final Consumer<ConsumerRecord<String, String>> recordHandlers[];
64 private volatile boolean partitionOwnershipUnknown = true;
67 public TransferConsumer(
70 Map<String, String> instanceIdUriMapping,
71 KafkaConsumer<String, String> consumer,
72 AdminClient adminClient,
73 TransferRepository repository,
75 int stateStoreInterval,
77 ConsumerUseCases productionUseCases,
78 ConsumerUseCases restoreUseCases)
81 this.numPartitions = numPartitions;
82 this.groupId = consumer.groupMetadata().groupId();
83 this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
84 this.instanceIdByPartition = new String[numPartitions];
85 this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
86 for (String instanceId : instanceIdUriMapping.keySet())
88 // Requests are not redirected for the instance itself
89 String uri = instanceId.equals(groupInstanceId)
91 : instanceIdUriMapping.get(instanceId);
92 this.instanceIdUriMapping.put(instanceId, uri);
94 this.consumer = consumer;
95 this.adminClient = adminClient;
96 this.repository = repository;
98 this.stateStoreInterval = stateStoreInterval;
100 this.restoreUseCases = restoreUseCases;
102 productionRecordHandler = (record) -> handleRecord(record, productionUseCases);
103 this.recordHandlers = new Consumer[numPartitions];
110 Instant stateStored = clock.instant();
116 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
117 if (records.count() == 0)
120 log.debug("polled {} records", records.count());
121 records.forEach(record -> recordHandlers[record.partition()].accept(record));
123 Instant now = clock.instant();
125 stateStoreInterval > 0 &&
126 Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
128 Map<Integer, Long> offsets = new HashMap<>();
130 for (TopicPartition topicPartition : consumer.assignment())
132 Integer partition = topicPartition.partition();
133 Long offset = consumer.position(topicPartition);
134 log.info("storing state locally for {}/{}: {}", topic, partition, offset);
135 offsets.put(partition, offset);
138 repository.storeState(offsets);
142 catch (WakeupException e)
144 log.info("cleanly interrupted while polling");
148 log.info("polling stopped");
151 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
155 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
159 case EventType.NEW_TRANSFER:
161 NewTransferEvent newTransferEvent =
162 mapper.readValue(record.value(), NewTransferEvent.class);
165 newTransferEvent.getId(),
166 newTransferEvent.getPayer(),
167 newTransferEvent.getPayee(),
168 newTransferEvent.getAmount());
171 case EventType.TRANSFER_STATE_CHANGED:
173 TransferStateChangedEvent stateChangedEvent =
174 mapper.readValue(record.value(), TransferStateChangedEvent.class);
175 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
179 catch (JsonProcessingException e)
182 "ignoring invalid json in message #{} on {}/{}: {}",
188 catch (IllegalArgumentException e)
191 "ignoring invalid message #{} on {}/{}: {}, message={}",
202 * Identifies the URI, at which the Group-Instance can be reached,
203 * that holds the state for a specific {@link Transfer}.
205 * The {@link Transfer#getId() ID} of the {@link Transfer} is named
206 * {@code key} here and of type {@code String}, because this example
207 * project stores the key as a String in Kafka to simplify the listing
208 * and manual manipulation of the according topic.
210 * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
211 * @return An {@link Optional}, that holds the URI at which the Group-Instance
212 * can be reached, that holds the state for the {@link Transfer}, that
213 * is identified by the key (if present), or is empty, if the {@link Transfer}
214 * would be handled by the local instance.
216 public Optional<String> uriForKey(String key)
220 while (partitionOwnershipUnknown)
222 try { wait(); } catch (InterruptedException e) {}
225 int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
228 .ofNullable(instanceIdByPartition[partition])
229 .map(id -> instanceIdUriMapping.get(id));
234 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
236 // "Needed", because this method is called synchronously during the
237 // initialization pahse of Spring. If the subscription happens
238 // in the same thread, it would block the completion of the initialization.
239 // Hence, the app would not react to any signal (CTRL-C, for example) except
240 // a KILL until the restoring is finished.
241 future = CompletableFuture.runAsync(() -> start());
242 log.info("start of application completed");
247 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
249 partitionOwnershipUnknown = true;
250 log.info("partitions revoked: {}", partitions);
251 for (TopicPartition topicPartition : partitions)
253 int partition = topicPartition.partition();
254 long offset = consumer.position(topicPartition);
255 log.info("deactivating partition {}, offset: {}", partition, offset);
256 repository.deactivatePartition(partition, offset);
261 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
263 log.info("partitions assigned: {}", partitions);
264 fetchAssignmentsAsync();
265 if (partitions.size() > 0)
267 for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
269 TopicPartition topicPartition = entry.getKey();
270 Integer partition = topicPartition.partition();
271 long offset = repository.activatePartition(partition);
272 log.info("activated partition {}, seeking to offset {}", partition, offset);
273 consumer.seek(topicPartition, offset);
274 Long endOffset = entry.getValue();
275 if (offset < endOffset)
277 log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
278 recordHandlers[partition] = new RestoreRecordHandler(endOffset);
282 log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
283 recordHandlers[partition] = productionRecordHandler;
289 private void fetchAssignmentsAsync()
292 .describeConsumerGroups(List.of(groupId))
295 .whenComplete((descriptions, e) ->
299 log.error("could not fetch group data: {}", e.getMessage());
305 for (MemberDescription description : descriptions.members())
310 .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
312 partitionOwnershipUnknown = false;
320 public void onPartitionsLost(Collection<TopicPartition> partitions)
322 partitionOwnershipUnknown = true;
323 log.info("partiotions lost: {}", partitions);
327 class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
332 RestoreRecordHandler(Long endOffset)
334 this.seen = endOffset - 1;
339 public void accept(ConsumerRecord<String, String> record)
341 if (seen < record.offset())
343 int partition = record.partition();
345 "--> restore of partition {} completed: needed={}, seen={}!",
349 recordHandlers[partition] = productionRecordHandler;
350 productionRecordHandler.accept(record);
354 handleRecord(record, restoreUseCases);
355 if (seen == record.offset())
357 int partition = record.partition();
358 log.info( "--> restore of partition {} completed!", partition);
359 recordHandlers[partition] = productionRecordHandler;
366 @PostMapping("start")
367 public synchronized String start()
371 log.info("consumer already running!");
372 return "Already running!";
375 int foundNumPartitions = consumer.partitionsFor(topic).size();
376 if (foundNumPartitions != numPartitions)
379 "unexpected number of partitions for topic {}: expected={}, found={}",
384 return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
387 consumer.subscribe(List.of(topic), this);
390 future = CompletableFuture.runAsync(this);
392 log.info("consumer started");
397 public synchronized String stop()
401 log.info("consumer not running!");
402 return "Not running";
407 if (!future.isDone())
410 log.info("waiting for the consumer...");
415 catch (InterruptedException|ExecutionException e)
417 log.error("Exception while joining polling task!", e);
418 return e.getMessage();
423 consumer.unsubscribe();
426 log.info("consumer stopped");
430 public synchronized void shutdown()
432 log.info("shutdown initiated!");
435 log.info("closing consumer");
441 public interface ConsumerUseCases
444 CreateTransferUseCase,
445 HandleStateChangeUseCase {};