import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
+import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
import de.juplo.kafka.payment.transfer.domain.TransferService;
import de.juplo.kafka.payment.transfer.ports.MessagingService;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
@SpringBootApplication
return new KafkaProducer<>(props);
}
+ @Bean
+ KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
+ {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return new KafkaConsumer<>(props);
+ }
+
+ @Bean(destroyMethod = "shutdown")
+ ExecutorService executorService()
+ {
+ return Executors.newFixedThreadPool(1);
+ }
+
+ @Bean(destroyMethod = "shutdown")
+ TransferConsumer transferConsumer(
+ TransferServiceProperties properties,
+ KafkaConsumer<String, String> consumer,
+ ExecutorService executorService,
+ ObjectMapper mapper,
+ TransferService transferService)
+ {
+ TransferConsumer transferConsumer =
+ new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService);
+ transferConsumer.start();
+ return transferConsumer;
+ }
+
@Bean
MessagingService kafkaMessagingService(
KafkaProducer<String, String> producer,
{
String bootstrapServers = "localhost:9092";
String topic = "transfers";
+ String groupId = "transfers";
}
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+
+@RequestMapping("/consumer")
+@ResponseBody
+@RequiredArgsConstructor
+@Slf4j
+public class TransferConsumer implements Runnable
+{
+ private final String topic;
+ private final KafkaConsumer<String, String> consumer;
+ private final ExecutorService executorService;
+ private final ObjectMapper mapper;
+ private final HandleTransferUseCase handleTransferUseCase;
+
+ private boolean running = false;
+ private Future<?> future = null;
+
+
+ @Override
+ public void run()
+ {
+ while (running)
+ {
+ try
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+ log.debug("polled {} records", records.count());
+
+ records.forEach(record ->
+ {
+ try
+ {
+ Transfer transfer = mapper.readValue(record.value(), Transfer.class);
+ handleTransferUseCase.handle(transfer);
+ }
+ catch (JsonProcessingException e)
+ {
+ log.error(
+ "ignoring invalid json in message #{} on {}/{}: {}",
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.value());
+ }
+ });
+ }
+ catch (WakeupException e)
+ {
+ log.info("polling aborted!");
+ }
+ }
+
+ log.info("polling stopped");
+ }
+
+
+ @PostMapping("start")
+ public synchronized String start()
+ {
+ String result = "Started";
+
+ if (running)
+ {
+ stop();
+ result = "Restarted";
+ }
+
+ log.info("subscribing to topic {}", topic);
+ consumer.subscribe(Set.of(topic));
+ running = true;
+ future = executorService.submit(this);
+
+ return result;
+ }
+
+ @PostMapping("stop")
+ public synchronized String stop()
+ {
+ if (!running)
+ {
+ log.info("not running!");
+ return "Not running";
+ }
+
+ running = false;
+ if (!future.isDone())
+ consumer.wakeup();
+ log.info("waiting for the polling-loop to finish...");
+ try
+ {
+ future.get();
+ }
+ catch (InterruptedException|ExecutionException e)
+ {
+ log.error("Exception while joining polling task!", e);
+ return e.getMessage();
+ }
+ finally
+ {
+ future = null;
+ log.info("unsubscribing");
+ consumer.unsubscribe();
+ }
+
+ return "Stoped";
+ }
+
+ public synchronized void shutdown()
+ {
+ log.info("shutdown initiated!");
+ stop();
+ log.info("closing consumer");
+ consumer.close();
+ }
+}
import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.ReceiveTransferUseCase;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import javax.validation.Valid;
{
public final static String PATH = "/transfers";
- private final InitiateTransferUseCase initiateTransferUseCase;
+ private final ReceiveTransferUseCase receiveTransferUseCase;
private final GetTransferUseCase getTransferUseCase;
path = PATH,
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
+ public DeferredResult<ResponseEntity<?>> transfer(
+ HttpServletRequest request,
+ @Valid @RequestBody TransferDTO transferDTO)
{
Transfer transfer =
Transfer
.amount(transferDTO.getAmount())
.build();
- initiateTransferUseCase.initiate(transfer);
+ DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
+
+ receiveTransferUseCase
+ .receive(transfer)
+ .thenApply(
+ $ ->
+ ResponseEntity
+ .created(URI.create(PATH + "/" + transferDTO.getId()))
+ .build())
+ .thenAccept(
+ responseEntity -> result.setResult(responseEntity))
+ .exceptionally(
+ e ->
+ {
+ result.setErrorResult(e);
+ return null;
+ });
- return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
+ return result;
}
@GetMapping(
import lombok.Data;
import lombok.EqualsAndHashCode;
+import java.util.LinkedList;
+import java.util.List;
+
@Data
@Builder
-@EqualsAndHashCode(exclude = "state")
+@EqualsAndHashCode(exclude = { "state", "messages" })
public class Transfer
{
public enum State
{
- SENT,
- FAILED,
- PENDING,
- APPROVED,
- REJECTED
+ RECEIVED(false),
+ INVALID(false),
+ CHECKED(false),
+ APPROVED(true),
+ REJECTED(true);
+
+ public final boolean foreign;
+
+ State(boolean foreign)
+ {
+ this.foreign = foreign;
+ }
}
private final long id;
private final int amount;
private State state;
+
+ private final List<String> messages = new LinkedList<>();
+
+
+ public Transfer setState(State state)
+ {
+ this.state = state;
+ return this;
+ }
+
+ public Transfer addMessage(String message)
+ {
+ messages.add(message);
+ return this;
+ }
}
package de.juplo.kafka.payment.transfer.domain;
-import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
-import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import de.juplo.kafka.payment.transfer.ports.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
@Slf4j
@RequiredArgsConstructor
-public class TransferService implements InitiateTransferUseCase, GetTransferUseCase
+public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
{
private final TransferRepository repository;
private final MessagingService messagingService;
- public synchronized void initiate(Transfer transfer)
+ public CompletableFuture<TopicPartition> receive(Transfer transfer)
+ {
+ transfer.setState(RECEIVED);
+ return messagingService.send(transfer);
+ }
+
+ @Override
+ public void handle(Transfer transfer)
+ {
+ Transfer.State state = transfer.getState();
+ switch (state)
+ {
+ case RECEIVED:
+ repository.store(transfer);
+ check(transfer);
+ break;
+
+ case CHECKED:
+ repository.store(transfer);
+ // TODO: What's next...?
+ break;
+
+ default:
+ log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state);
+ }
+ }
+
+ private void check(Transfer transfer)
{
repository
.get(transfer.getId())
stored ->
{
if (!transfer.equals(stored))
- throw new IllegalArgumentException(
- "Re-Initiation of transfer with different data: old=" +
- stored +
- ", new=" +
- transfer);
-
- if (stored.getState() == FAILED)
- {
- repository.update(transfer.getId(), FAILED, SENT);
- log.info("Resending faild transfer: " + stored);
- send(transfer);
- }
+ log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
},
() ->
{
- send(transfer);
- transfer.setState(SENT);
repository.store(transfer);
- });
- }
-
- private void send(Transfer transfer)
- {
- messagingService
- .send(transfer)
- .thenApply(
- $ ->
- {
- repository.update(transfer.getId(), SENT, PENDING);
- return null;
- })
- .exceptionally(
- e ->
- {
- repository.update(transfer.getId(), SENT, FAILED);
- return null;
+ // TODO: Do some time consuming checks...
+ transfer.setState(CHECKED);
+ messagingService.send(transfer);
});
}
@Override
- public synchronized void store(Transfer transfer)
- {
- Optional
- .ofNullable(map.get(transfer.getId()))
- .ifPresentOrElse(
- json ->
- {
- throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
- },
- () -> put(transfer));
- }
-
- private void put(Transfer transfer)
+ public void store(Transfer transfer)
{
try
{
}
catch (JsonProcessingException e)
{
- log.error("Could not convert Transfer.class: {}", transfer, e);
+ throw new RuntimeException(e);
}
}
@Override
- public synchronized Optional<Transfer> get(Long id)
+ public Optional<Transfer> get(Long id)
{
return
Optional
});
}
- @Override
- public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
- {
- Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
-
- if (transfer.getState() != oldState)
- throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
-
- transfer.setState(newState);
- put(transfer);
- }
-
@Override
public void remove(Long id)
{
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+
+public interface HandleTransferUseCase
+{
+ void handle(Transfer transfer);
+}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.ports;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-
-
-public interface InitiateTransferUseCase
-{
- void initiate(Transfer transfer);
-}
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+
+public interface ReceiveTransferUseCase
+{
+ CompletableFuture<TopicPartition> receive(Transfer transfer);
+}
Optional<Transfer> get(Long id);
- void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
-
void remove(Long id);
}
import org.junit.jupiter.api.Test;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.PENDING;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.SENT;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
import static org.assertj.core.api.Assertions.assertThat;
@Test
public void testEqualsIgnoresState()
{
- Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(SENT).build();
- Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(PENDING).build();
+ Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(RECEIVED).build();
+ Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CHECKED).build();
assertThat(a).isEqualTo(b);
}