From: Kai Moritz Date: Fri, 10 Jul 2020 12:54:21 +0000 (+0200) Subject: WIP X-Git-Tag: wip-initial~20 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a498c7ae070b064dc7ea9a453b389ecff5c190ce;p=demos%2Fkafka%2Foutbox WIP --- a498c7ae070b064dc7ea9a453b389ecff5c190ce diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c507849 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target +.idea diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ae3fdf4 --- /dev/null +++ b/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.1.5.RELEASE + + + de.trion.kafka + outbox + 0.0.1-SNAPSHOT + Polling-Outbox-Pattern + Implementierung des Outbox-Patterns auf Basis von JDBC + + + 1.8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + com.h2database + h2 + runtime + + + org.springframework.boot + spring-boot-starter-json + + + + org.apache.kafka + kafka-clients + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + + diff --git a/src/main/java/de/trion/kafka/outbox/Application.java b/src/main/java/de/trion/kafka/outbox/Application.java new file mode 100644 index 0000000..3e0a723 --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/Application.java @@ -0,0 +1,49 @@ +package de.trion.kafka.outbox; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application { + + @Autowired + ApplicationProperties properties; + + + @Bean + public String bootstrapServers() { return properties.bootstrapServers; } + + @Bean + public String topic() { + return properties.topic; + } + + @Bean + public String consumerGroup() { + return properties.consumerGroup; + } + + @Bean + public WebMvcConfigurer corsConfigurer() { + return new WebMvcConfigurer() { + @Override + public void addCorsMappings(CorsRegistry registry) { + registry + .addMapping("/**") + .allowedOrigins("http://localhost:4200"); + } + }; + } + + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java b/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java new file mode 100644 index 0000000..7ba4c06 --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java @@ -0,0 +1,35 @@ +package de.trion.kafka.outbox; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("outbox.polling") +public class ApplicationProperties { + public String bootstrapServers = "localhost:9092"; + public String topic = "outbox-polling"; + public String consumerGroup = "polling"; + + + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } +} diff --git a/src/main/java/de/trion/kafka/outbox/Event.java b/src/main/java/de/trion/kafka/outbox/Event.java new file mode 100644 index 0000000..ca228e0 --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/Event.java @@ -0,0 +1,9 @@ +package de.trion.kafka.outbox; + +public class Event { + public enum Type { CREATED, LOGIN, LOGOUT, DELETED } + + Long id; + Type type; + String user; +} diff --git a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java new file mode 100644 index 0000000..7bb6d4f --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java @@ -0,0 +1,174 @@ +package de.trion.kafka.outbox; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.lvm.tx.Event; +import de.lvm.tx.Command; +import de.lvm.tx.Command.Action; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import javax.swing.*; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; + +import static de.lvm.tx.Event.Type.*; + +@Component +public class OutboxConsumer implements ApplicationRunner, Runnable { + + private final static Logger LOG = LoggerFactory.getLogger(OutboxConsumer.class); + + private final OutboxService service; + private final OutboxProducer sender; + private final ObjectMapper mapper; + private final String topic; + private final KafkaConsumer consumer; + private final Thread thread; + + private long internalState = 1; + + + public OutboxConsumer( + OutboxService service, + OutboxProducer sender, + ObjectMapper mapper, + String bootstrapServers, + String consumerGroup, + String topic) { + + this.service = service; + this.sender = sender; + this.mapper = mapper; + this.topic = topic; + + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("group.id", consumerGroup); + props.put("auto.commit.interval.ms", 15000); + props.put("metadata.max.age.ms", 1000); + props.put("key.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + consumer = new KafkaConsumer<>(props); + + thread = new Thread(this); + } + + + @Override + public void run() { + try + { + LOG.info("Subscribing to topic " + topic); + consumer.subscribe(Arrays.asList(topic)); + + while (true) + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + byte code = record.headers().lastHeader("messageType").value()[0]; + Action action = Action.from(code); + + if (action == null) + { + LOG.debug("Ignoring unknown action {} for {}", code, record.value()); + continue; + } + + switch(action) { + case SAVE_DLZ: + dlzSaveReceived(toCommand(record.value())); + continue; + default: + LOG.debug("Ignoring message {}", record.value()); + } + byte[] bytes = record.headers().lastHeader("messageType").value(); + String type = new String(bytes, StandardCharsets.UTF_8); + + if (type.endsWith("DlzAction")) { + dlzSaveReceived(toCommand(record.value())); + continue; + } + + LOG.debug("Ignoring command {}", record.value()); + } + } + } + catch (WakeupException e) {} + catch (Exception e) { + LOG.error("Unexpected exception!", e); + } + finally + { + LOG.info("Closing the KafkaConsumer..."); + try { + consumer.close(Duration.ofSeconds(5)); + LOG.debug("Successfully closed the KafkaConsumer"); + } + catch (Exception e) { + LOG.warn("Exception while closing the KafkaConsumer!", e); + } + } + } + + public Command toCommand(String message) throws IOException { + Command command = mapper.readValue(message, Command.class); + LOG.info("{}: {}", command.getAction(), command.getVorgangId()); + return command; + } + + public void dlzSaveReceived(Command command) throws InterruptedException { + try + { + String result = + service.bearbeiteVorgang( + command.getVorgangId(), + command.getVbId(), + command.getData()); + reply(command, result); + } + catch (Exception e) { + LOG.error("Exception during processing!", e); + } + } + + public void reply(Command command, String message) { + String vorgangId = command.getVorgangId(); + String vbId = command.getVbId(); + Event event = new Event(DLZ_SAVED, vorgangId, vbId); + event.getZustand().put(Event.DLZ, message); + sender.send(event); + } + + + @Override + public void run(ApplicationArguments args) { + thread.start(); + try { + thread.join(); + LOG.info("Successfully joined the consumer-thread"); + } + catch (InterruptedException e) { + LOG.info("Main-thread was interrupted while joining the consumer-thread"); + } + } + + @PreDestroy + public void stop() + { + LOG.info("Stopping the KafkaConsumer..."); + consumer.wakeup(); + } +} diff --git a/src/main/java/de/trion/kafka/outbox/OutboxController.java b/src/main/java/de/trion/kafka/outbox/OutboxController.java new file mode 100644 index 0000000..934ca1f --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/OutboxController.java @@ -0,0 +1,27 @@ +package de.trion.kafka.outbox; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + +@RestController +public class OutboxController { + + private static final Logger LOG = LoggerFactory.getLogger(OutboxController.class); + + + private final OutboxService service; + + + public OutboxController(OutboxService service) { + this.service = service; + } + + + @PostMapping("/create") + public ResponseEntity getVorgang(@RequestBody String user) { + } +} diff --git a/src/main/java/de/trion/kafka/outbox/OutboxProducer.java b/src/main/java/de/trion/kafka/outbox/OutboxProducer.java new file mode 100644 index 0000000..99adae9 --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/OutboxProducer.java @@ -0,0 +1,72 @@ +package de.trion.kafka.outbox; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +@Component +public class OutboxProducer { + + private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); + + private final ObjectMapper mapper; + private final String topic; + private final KafkaProducer producer; + + + public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) { + this.mapper = mapper; + this.topic = topic; + + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + producer = new KafkaProducer<>(props); + } + + + public void send(Event event) { + try { + String json = mapper.writeValueAsString(event); + ProducerRecord record = new ProducerRecord<>(topic, event.user, json); + producer.send(record, (metadata, e) -> { + if (e != null) { + LOG.error("Could not send event {}!", json, e); + } + else { + LOG.debug( + "{}: send event {} with offset {} to partition {}", + event.user, + event.id, + metadata.offset(), + metadata.partition()); + } + }); + } catch (Exception e) { + throw new RuntimeException("Fehler beim Senden des Events " + event.id, e); + } + } + + + @PreDestroy + public void stop(){ + LOG.info("Closing the KafkaProducer..."); + try { + producer.close(5, TimeUnit.SECONDS); + LOG.debug("Successfully closed the KafkaProducer"); + } + catch (Exception e) { + LOG.warn("Exception while closing the KafkaProducer!", e); + } + } +} diff --git a/src/main/java/de/trion/kafka/outbox/OutboxService.java b/src/main/java/de/trion/kafka/outbox/OutboxService.java new file mode 100644 index 0000000..e003839 --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/OutboxService.java @@ -0,0 +1,65 @@ +package de.trion.kafka.outbox; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +@Component +public class OutboxService { + + private static final Logger LOG = LoggerFactory.getLogger(OutboxService.class); + + + private final Map state = new HashMap<>(); + private final Map requests = new HashMap<>(); + + private long counter = 1; + + + public OutboxService() {} + + + public String bearbeiteVorgang(String vorgangId, String vbId, String data) { + if (vorgangId == null) + throw new IllegalArgumentException("vorgangId must not be null!"); + + // Fehler beim Sichern simulieren + Random r = new Random(); + int i = r.nextInt(10); + if (i == 0) + throw new RuntimeException("FEHLER!!!!!!"); + + String result = vorgangId + "|vbId=" + vbId + "|" + counter++ + ", rand=" + i + ": " + data; + + if (state.containsKey(vorgangId)) + LOG.info("Bearbeite Vorgang {}: alt={}, neu={}", vorgangId, state.get(vorgangId), data); + else + LOG.info("Bearbeite Vorgang {}: neu={}", vorgangId, data); + + process(vorgangId, result); + return result; + } + + public synchronized void process(String vorgangId, DeferredResult result) { + String data = state.get(vorgangId); + if (data != null) { + result.setResult(ResponseEntity.ok(data)); + } + else { + requests.put(vorgangId, result); + } + } + + private synchronized void process(String vorgangId, String result) { + state.put(vorgangId, result); + DeferredResult request = requests.get(vorgangId); + if (request != null) + request.setResult(ResponseEntity.ok(result)); + } +} diff --git a/src/main/java/de/trion/kafka/outbox/User.java b/src/main/java/de/trion/kafka/outbox/User.java new file mode 100644 index 0000000..ffb29bf --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/User.java @@ -0,0 +1,19 @@ +package de.trion.kafka.outbox; + +import org.springframework.data.annotation.Id; + +import java.time.LocalDateTime; + +public class User { + @Id + Long id; + String username; + LocalDateTime created; + boolean loggedIn; + + public User(String username, LocalDateTime created, boolean loggedIn) { + this.username = username; + this.created = created; + this.loggedIn = loggedIn; + } +} diff --git a/src/main/java/de/trion/kafka/outbox/UserRepository.java b/src/main/java/de/trion/kafka/outbox/UserRepository.java new file mode 100644 index 0000000..a12c2e7 --- /dev/null +++ b/src/main/java/de/trion/kafka/outbox/UserRepository.java @@ -0,0 +1,11 @@ +package de.trion.kafka.outbox; + +import org.springframework.data.jdbc.repository.query.Query; +import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; +import org.springframework.jdbc.core.JdbcTemplate; + +public interface UserRepository extends CrudRepository { + @Query("select * from User u where u.username = :username") + User findByEmailAddress(@Param("email") String username); +} diff --git a/src/test/java/de/trion/kafka/outbox/ApplicationTests.java b/src/test/java/de/trion/kafka/outbox/ApplicationTests.java new file mode 100644 index 0000000..0962964 --- /dev/null +++ b/src/test/java/de/trion/kafka/outbox/ApplicationTests.java @@ -0,0 +1,16 @@ +package de.trion.kafka.outbox; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class ApplicationTests { + + @Test + public void contextLoads() { + } + +}