import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
+import de.juplo.kafka.payment.transfer.adapter.NoOpMessageService;
import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
+import de.juplo.kafka.payment.transfer.adapter.TransferController;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.domain.TransferService;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer,
ExecutorService executorService,
ObjectMapper mapper,
- TransferService transferService)
+ TransferService productionTransferService,
+ TransferService restoreTransferService)
{
- TransferConsumer transferConsumer =
+ return
new TransferConsumer(
properties.topic,
consumer,
executorService,
mapper,
- transferService,
- transferService,
- transferService);
- transferConsumer.start();
- return transferConsumer;
+ new TransferConsumer.ConsumerUseCases() {
+ @Override
+ public void create(Transfer transfer)
+ {
+ productionTransferService.create(transfer);
+ }
+
+ @Override
+ public Optional<Transfer> get(Long id)
+ {
+ return productionTransferService.get(id);
+ }
+
+ @Override
+ public void handle(Transfer transfer)
+ {
+ productionTransferService.handle(transfer);
+ }
+ },
+ new TransferConsumer.ConsumerUseCases() {
+ @Override
+ public void create(Transfer transfer)
+ {
+ restoreTransferService.create(transfer);
+ }
+
+ @Override
+ public Optional<Transfer> get(Long id)
+ {
+ return restoreTransferService.get(id);
+ }
+
+ @Override
+ public void handle(Transfer transfer)
+ {
+ restoreTransferService.handle(transfer);
+ }
+ });
}
@Bean
- MessagingService kafkaMessagingService(
+ KafkaMessagingService kafkaMessagingService(
KafkaProducer<String, String> producer,
ObjectMapper mapper,
TransferServiceProperties properties)
}
@Bean
- TransferService transferService(
+ TransferService productionTransferService(
TransferRepository repository,
- MessagingService messagingService)
+ KafkaMessagingService kafkaMessagingService)
+ {
+ return new TransferService(repository, kafkaMessagingService);
+ }
+
+ @Bean
+ TransferService restoreTransferService(
+ TransferRepository repository,
+ NoOpMessageService noOpMessageService)
+ {
+ return new TransferService(repository, noOpMessageService);
+ }
+
+ @Bean
+ TransferController transferController(
+ TransferService productionTransferService,
+ KafkaMessagingService kafkaMessagingService)
{
- return new TransferService(repository, messagingService);
+ return new TransferController(productionTransferService, kafkaMessagingService);
}
import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.time.Duration;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executorService;
private final ObjectMapper mapper;
- private final GetTransferUseCase getTransferUseCase;
- private final CreateTransferUseCase createTransferUseCase;
- private final HandleStateChangeUseCase handleStateChangeUseCase;
+ private final ConsumerUseCases productionUseCases, restoreUseCases;
+ private boolean restoring = true;
private boolean running = false;
+ private boolean shutdown = false;
private Future<?> future = null;
try
{
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
- if (records.count() > 0)
- log.debug("polled {} records", records.count());
+ if (records.count() == 0)
+ continue;
- records.forEach(record ->
- {
- try
- {
- byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
-
- switch (eventType)
- {
- case EventType.NEW_TRANSFER:
-
- NewTransferEvent newTransferEvent =
- mapper.readValue(record.value(), NewTransferEvent.class);
- createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
- break;
-
- case EventType.TRANSFER_STATE_CHANGED:
-
- TransferStateChangedEvent stateChangedEvent =
- mapper.readValue(record.value(), TransferStateChangedEvent.class);
- getTransferUseCase
- .get(stateChangedEvent.getId())
- .ifPresentOrElse(
- transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
- () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
- break;
- }
- }
- catch (JsonProcessingException e)
- {
- log.error(
- "ignoring invalid json in message #{} on {}/{}: {}",
- record.offset(),
- record.topic(),
- record.partition(),
- record.value());
- }
- });
+ log.debug("polled {} records", records.count());
+ records.forEach(record -> handleRecord(record, productionUseCases));
}
catch (WakeupException e)
{
- log.info("polling aborted!");
+ log.info("cleanly interrupted while polling");
}
}
log.info("polling stopped");
}
+ private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
+ {
+ try
+ {
+ byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+ switch (eventType)
+ {
+ case EventType.NEW_TRANSFER:
+
+ NewTransferEvent newTransferEvent =
+ mapper.readValue(record.value(), NewTransferEvent.class);
+ useCases.create(newTransferEvent.toTransfer().setState(CREATED));
+ break;
+
+ case EventType.TRANSFER_STATE_CHANGED:
+
+ TransferStateChangedEvent stateChangedEvent =
+ mapper.readValue(record.value(), TransferStateChangedEvent.class);
+ useCases
+ .get(stateChangedEvent.getId())
+ .ifPresentOrElse(
+ transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())),
+ () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+ break;
+ }
+ }
+ catch (JsonProcessingException e)
+ {
+ log.error(
+ "ignoring invalid json in message #{} on {}/{}: {}",
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.value());
+ }
+ }
+
+ @EventListener
+ public synchronized void onApplicationEvent(ContextRefreshedEvent event)
+ {
+ // Needed, because this method is called synchronously during the
+ // initialization pahse of Spring. If the restoring is processed
+ // in the same thread, it would block the completion of the initialization.
+ // Hence, the app would not react to any signal (CTRL-C, for example) except
+ // a KILL until the restoring is finished.
+ future = executorService.submit(() -> restore());
+ }
+
+ private void restore()
+ {
+ log.info("--> starting restore...");
+
+ List<TopicPartition> partitions =
+ consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(info -> new TopicPartition(topic, info.partition()))
+ .collect(Collectors.toList());
+
+ Map<Integer, Long> lastSeen =
+ consumer
+ .endOffsets(partitions)
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey().partition(),
+ entry -> entry.getValue() - 1));
+
+ Map<Integer, Long> positions =
+ lastSeen
+ .keySet()
+ .stream()
+ .collect(Collectors.toMap(
+ partition -> partition,
+ partition -> 0l));
+
+ log.info("assigning {}}", partitions);
+ consumer.assign(partitions);
+
+ while (
+ restoring &&
+ positions
+ .entrySet()
+ .stream()
+ .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
+ .reduce(false, (a, b) -> a || b))
+ {
+ try
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+ if (records.count() == 0)
+ continue;
+
+ log.debug("polled {} records", records.count());
+ records.forEach(record ->
+ {
+ handleRecord(record, restoreUseCases);
+ positions.put(record.partition(), record.offset());
+ });
+ }
+ catch(WakeupException e)
+ {
+ log.info("--> cleanly interrupted while restoring");
+ return;
+ }
+ }
+
+ log.info("--> restore completed!");
+ restoring = false;
+
+ // We are intentionally _not_ unsubscribing here, since that would
+ // reset the offset to _earliest_, because we disabled offset-commits.
+
+ start();
+ }
@PostMapping("start")
public synchronized String start()
{
+ if (restoring)
+ {
+ log.error("cannot start while restoring");
+ return "Denied: Restoring!";
+ }
+
String result = "Started";
if (running)
result = "Restarted";
}
- log.info("subscribing to topic {}", topic);
- consumer.subscribe(Set.of(topic));
running = true;
future = executorService.submit(this);
+ log.info("started");
return result;
}
@PostMapping("stop")
public synchronized String stop()
{
- if (!running)
+ if (!(running || restoring))
{
log.info("not running!");
return "Not running";
}
running = false;
+
if (!future.isDone())
consumer.wakeup();
- log.info("waiting for the polling-loop to finish...");
+
+ log.info("waiting for the consumer...");
try
{
future.get();
finally
{
future = null;
- log.info("unsubscribing");
- consumer.unsubscribe();
}
- return "Stoped";
+ log.info("stopped");
+ return "Stopped";
}
public synchronized void shutdown()
{
log.info("shutdown initiated!");
+ shutdown = true;
stop();
log.info("closing consumer");
consumer.close();
}
+
+
+ public interface ConsumerUseCases
+ extends
+ GetTransferUseCase,
+ CreateTransferUseCase,
+ HandleStateChangeUseCase {};
}