1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.admin.AdminClient;
12 import org.apache.kafka.clients.admin.MemberDescription;
13 import org.apache.kafka.clients.consumer.*;
14 import org.apache.kafka.common.Cluster;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.WakeupException;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.springframework.context.event.ContextRefreshedEvent;
19 import org.springframework.context.event.EventListener;
20 import org.springframework.web.bind.annotation.PostMapping;
21 import org.springframework.web.bind.annotation.RequestMapping;
22 import org.springframework.web.bind.annotation.ResponseBody;
24 import java.nio.ByteBuffer;
25 import java.time.Clock;
26 import java.time.Duration;
27 import java.time.Instant;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Future;
32 import java.util.stream.Collectors;
35 @RequestMapping("/consumer")
38 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
40 private final String topic;
41 private final int numPartitions;
42 private final KafkaConsumer<String, String> consumer;
43 private final AdminClient adminClient;
44 private final TransferRepository repository;
45 private final ObjectMapper mapper;
46 private final ConsumerUseCases productionUseCases, restoreUseCases;
48 private boolean running = false;
49 private boolean shutdown = false;
50 private Future<?> future = null;
52 private final String groupId;
53 private final String groupInstanceId;
54 private final Map<String, String> instanceIdUriMapping;
55 private final String[] instanceIdByPartition;
58 private int stateStoreInterval;
60 private volatile boolean partitionOwnershipUnknown = true;
63 public TransferConsumer(
64 String bootstrapServers,
66 String groupInstanceId,
69 Map<String, String> instanceIdUriMapping,
70 AdminClient adminClient,
71 TransferRepository repository,
73 int stateStoreInterval,
75 ConsumerUseCases productionUseCases,
76 ConsumerUseCases restoreUseCases)
78 Properties props = new Properties();
79 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
80 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
81 props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
82 props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor());
83 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
84 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
85 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
86 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
88 this.consumer = new KafkaConsumer<>(props).;
90 this.groupId = groupId;
91 this.groupInstanceId = groupInstanceId;
93 this.numPartitions = numPartitions;
94 this.instanceIdByPartition = new String[numPartitions];
95 this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
96 for (String instanceId : instanceIdUriMapping.keySet())
98 // Requests are not redirected for the instance itself
99 String uri = instanceId.equals(groupInstanceId)
101 : instanceIdUriMapping.get(instanceId);
102 this.instanceIdUriMapping.put(instanceId, uri);
104 this.adminClient = adminClient;
105 this.repository = repository;
107 this.stateStoreInterval = stateStoreInterval;
108 this.mapper = mapper;
109 this.productionUseCases = productionUseCases;
110 this.restoreUseCases = restoreUseCases;
117 Instant stateStored = clock.instant();
123 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
124 if (records.count() == 0)
127 log.debug("polled {} records", records.count());
128 records.forEach(record -> handleRecord(record, productionUseCases));
130 Instant now = clock.instant();
132 stateStoreInterval > 0 &&
133 Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
135 Map<Integer, Long> offsets = new HashMap<>();
137 for (TopicPartition topicPartition : consumer.assignment())
139 Integer partition = topicPartition.partition();
140 Long offset = consumer.position(topicPartition);
141 log.info("storing state locally for {}/{}: {}", topic, partition, offset);
142 offsets.put(partition, offset);
145 repository.storeState(offsets);
149 catch (WakeupException e)
151 log.info("cleanly interrupted while polling");
155 log.info("polling stopped");
158 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
162 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
166 case EventType.NEW_TRANSFER:
168 NewTransferEvent newTransferEvent =
169 mapper.readValue(record.value(), NewTransferEvent.class);
172 newTransferEvent.getId(),
173 newTransferEvent.getPayer(),
174 newTransferEvent.getPayee(),
175 newTransferEvent.getAmount());
178 case EventType.TRANSFER_STATE_CHANGED:
180 TransferStateChangedEvent stateChangedEvent =
181 mapper.readValue(record.value(), TransferStateChangedEvent.class);
182 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
186 catch (JsonProcessingException e)
189 "ignoring invalid json in message #{} on {}/{}: {}",
195 catch (IllegalArgumentException e)
198 "ignoring invalid message #{} on {}/{}: {}, message={}",
209 * Identifies the URI, at which the Group-Instance can be reached,
210 * that holds the state for a specific {@link Transfer}.
212 * The {@link Transfer#getId() ID} of the {@link Transfer} is named
213 * {@code key} here and of type {@code String}, because this example
214 * project stores the key as a String in Kafka to simplify the listing
215 * and manual manipulation of the according topic.
217 * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
218 * @return An {@link Optional}, that holds the URI at which the Group-Instance
219 * can be reached, that holds the state for the {@link Transfer}, that
220 * is identified by the key (if present), or is empty, if the {@link Transfer}
221 * would be handled by the local instance.
223 public Optional<String> uriForKey(String key)
227 while (partitionOwnershipUnknown)
229 try { wait(); } catch (InterruptedException e) {}
232 int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
235 .ofNullable(instanceIdByPartition[partition])
236 .map(id -> instanceIdUriMapping.get(id));
241 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
243 // "Needed", because this method is called synchronously during the
244 // initialization pahse of Spring. If the subscription happens
245 // in the same thread, it would block the completion of the initialization.
246 // Hence, the app would not react to any signal (CTRL-C, for example) except
247 // a KILL until the restoring is finished.
248 future = CompletableFuture.runAsync(() -> start());
249 log.info("start of application completed");
254 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
256 partitionOwnershipUnknown = true;
257 log.info("partitions revoked: {}", partitions);
258 for (TopicPartition topicPartition : partitions)
260 int partition = topicPartition.partition();
261 long offset = consumer.position(topicPartition);
262 log.info("deactivating partition {}, offset: {}", partition, offset);
263 repository.deactivatePartition(partition, offset);
268 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
270 log.info("partitions assigned: {}", partitions);
271 fetchAssignmentsAsync();
272 if (partitions.size() > 0)
274 for (TopicPartition topicPartition : partitions)
276 int partition = topicPartition.partition();
277 long offset = repository.activatePartition(partition);
278 log.info("activated partition {}, seeking to offset {}", partition, offset);
279 consumer.seek(topicPartition, offset);
286 private void fetchAssignmentsAsync()
289 .describeConsumerGroups(List.of(groupId))
292 .whenComplete((descriptions, e) ->
296 log.error("could not fetch group data: {}", e.getMessage());
302 for (MemberDescription description : descriptions.members())
307 .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
309 partitionOwnershipUnknown = false;
317 public void onPartitionsLost(Collection<TopicPartition> partitions)
319 partitionOwnershipUnknown = true;
320 log.info("partiotions lost: {}", partitions);
324 private void restore(Collection<TopicPartition> partitions)
326 log.info("--> starting restore...");
328 Map<Integer, Long> lastSeen =
330 .endOffsets(partitions)
333 .collect(Collectors.toMap(
334 entry -> entry.getKey().partition(),
335 entry -> entry.getValue() - 1));
337 Map<Integer, Long> positions =
341 .collect(Collectors.toMap(
342 partition -> partition,
343 partition -> repository.storedPosition(partition)));
349 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
350 .reduce(false, (a, b) -> a || b))
354 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
355 if (records.count() == 0)
358 log.debug("polled {} records", records.count());
359 records.forEach(record ->
361 handleRecord(record, restoreUseCases);
362 positions.put(record.partition(), record.offset());
365 catch(WakeupException e)
367 log.info("--> cleanly interrupted while restoring");
371 log.info("--> restore completed!");
374 @PostMapping("start")
375 public synchronized String start()
379 log.info("consumer already running!");
380 return "Already running!";
383 int foundNumPartitions = consumer.partitionsFor(topic).size();
384 if (foundNumPartitions != numPartitions)
387 "unexpected number of partitions for topic {}: expected={}, found={}",
392 return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
395 consumer.subscribe(List.of(topic), this);
398 future = CompletableFuture.runAsync(this);
400 log.info("consumer started");
405 public synchronized String stop()
409 log.info("consumer not running!");
410 return "Not running";
415 if (!future.isDone())
418 log.info("waiting for the consumer...");
423 catch (InterruptedException|ExecutionException e)
425 log.error("Exception while joining polling task!", e);
426 return e.getMessage();
431 consumer.unsubscribe();
434 log.info("consumer stopped");
438 public synchronized void shutdown()
440 log.info("shutdown initiated!");
443 log.info("closing consumer");
449 public interface ConsumerUseCases
452 CreateTransferUseCase,
453 HandleStateChangeUseCase {};
455 public class Assignor extends CooperativeStickyAssignor
458 public GroupAssignment assign(
460 GroupSubscription groupSubscription)
462 return super.assign(metadata, groupSubscription);
466 public ByteBuffer subscriptionUserData(Set<String> topics)
472 public void onAssignment(
473 Assignment assignment,
474 ConsumerGroupMetadata metadata)
476 log.info("New assignment: {}, {}", assignment, metadata);