--- /dev/null
+target
+.idea
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.1.5.RELEASE</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+ <groupId>de.trion.kafka</groupId>
+ <artifactId>outbox</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>Polling-Outbox-Pattern</name>
+ <description>Implementierung des Outbox-Patterns auf Basis von JDBC</description>
+
+ <properties>
+ <java.version>1.8</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-json</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.trion.kafka.outbox;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application {
+
+ @Autowired
+ ApplicationProperties properties;
+
+
+ @Bean
+ public String bootstrapServers() { return properties.bootstrapServers; }
+
+ @Bean
+ public String topic() {
+ return properties.topic;
+ }
+
+ @Bean
+ public String consumerGroup() {
+ return properties.consumerGroup;
+ }
+
+ @Bean
+ public WebMvcConfigurer corsConfigurer() {
+ return new WebMvcConfigurer() {
+ @Override
+ public void addCorsMappings(CorsRegistry registry) {
+ registry
+ .addMapping("/**")
+ .allowedOrigins("http://localhost:4200");
+ }
+ };
+ }
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("outbox.polling")
+public class ApplicationProperties {
+ public String bootstrapServers = "localhost:9092";
+ public String topic = "outbox-polling";
+ public String consumerGroup = "polling";
+
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+public class Event {
+ public enum Type { CREATED, LOGIN, LOGOUT, DELETED }
+
+ Long id;
+ Type type;
+ String user;
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.lvm.tx.Event;
+import de.lvm.tx.Command;
+import de.lvm.tx.Command.Action;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import javax.swing.*;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static de.lvm.tx.Event.Type.*;
+
+@Component
+public class OutboxConsumer implements ApplicationRunner, Runnable {
+
+ private final static Logger LOG = LoggerFactory.getLogger(OutboxConsumer.class);
+
+ private final OutboxService service;
+ private final OutboxProducer sender;
+ private final ObjectMapper mapper;
+ private final String topic;
+ private final KafkaConsumer<Long, String> consumer;
+ private final Thread thread;
+
+ private long internalState = 1;
+
+
+ public OutboxConsumer(
+ OutboxService service,
+ OutboxProducer sender,
+ ObjectMapper mapper,
+ String bootstrapServers,
+ String consumerGroup,
+ String topic) {
+
+ this.service = service;
+ this.sender = sender;
+ this.mapper = mapper;
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServers);
+ props.put("group.id", consumerGroup);
+ props.put("auto.commit.interval.ms", 15000);
+ props.put("metadata.max.age.ms", 1000);
+ props.put("key.deserializer", LongDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+ consumer = new KafkaConsumer<>(props);
+
+ thread = new Thread(this);
+ }
+
+
+ @Override
+ public void run() {
+ try
+ {
+ LOG.info("Subscribing to topic " + topic);
+ consumer.subscribe(Arrays.asList(topic));
+
+ while (true)
+ {
+ ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
+ for (ConsumerRecord<Long, String> record : records) {
+ byte code = record.headers().lastHeader("messageType").value()[0];
+ Action action = Action.from(code);
+
+ if (action == null)
+ {
+ LOG.debug("Ignoring unknown action {} for {}", code, record.value());
+ continue;
+ }
+
+ switch(action) {
+ case SAVE_DLZ:
+ dlzSaveReceived(toCommand(record.value()));
+ continue;
+ default:
+ LOG.debug("Ignoring message {}", record.value());
+ }
+ byte[] bytes = record.headers().lastHeader("messageType").value();
+ String type = new String(bytes, StandardCharsets.UTF_8);
+
+ if (type.endsWith("DlzAction")) {
+ dlzSaveReceived(toCommand(record.value()));
+ continue;
+ }
+
+ LOG.debug("Ignoring command {}", record.value());
+ }
+ }
+ }
+ catch (WakeupException e) {}
+ catch (Exception e) {
+ LOG.error("Unexpected exception!", e);
+ }
+ finally
+ {
+ LOG.info("Closing the KafkaConsumer...");
+ try {
+ consumer.close(Duration.ofSeconds(5));
+ LOG.debug("Successfully closed the KafkaConsumer");
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while closing the KafkaConsumer!", e);
+ }
+ }
+ }
+
+ public Command toCommand(String message) throws IOException {
+ Command command = mapper.readValue(message, Command.class);
+ LOG.info("{}: {}", command.getAction(), command.getVorgangId());
+ return command;
+ }
+
+ public void dlzSaveReceived(Command command) throws InterruptedException {
+ try
+ {
+ String result =
+ service.bearbeiteVorgang(
+ command.getVorgangId(),
+ command.getVbId(),
+ command.getData());
+ reply(command, result);
+ }
+ catch (Exception e) {
+ LOG.error("Exception during processing!", e);
+ }
+ }
+
+ public void reply(Command command, String message) {
+ String vorgangId = command.getVorgangId();
+ String vbId = command.getVbId();
+ Event event = new Event(DLZ_SAVED, vorgangId, vbId);
+ event.getZustand().put(Event.DLZ, message);
+ sender.send(event);
+ }
+
+
+ @Override
+ public void run(ApplicationArguments args) {
+ thread.start();
+ try {
+ thread.join();
+ LOG.info("Successfully joined the consumer-thread");
+ }
+ catch (InterruptedException e) {
+ LOG.info("Main-thread was interrupted while joining the consumer-thread");
+ }
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ LOG.info("Stopping the KafkaConsumer...");
+ consumer.wakeup();
+ }
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+
+@RestController
+public class OutboxController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OutboxController.class);
+
+
+ private final OutboxService service;
+
+
+ public OutboxController(OutboxService service) {
+ this.service = service;
+ }
+
+
+ @PostMapping("/create")
+ public ResponseEntity<Void> getVorgang(@RequestBody String user) {
+ }
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class OutboxProducer {
+
+ private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+ private final ObjectMapper mapper;
+ private final String topic;
+ private final KafkaProducer<String, String> producer;
+
+
+ public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
+ this.mapper = mapper;
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServers);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ producer = new KafkaProducer<>(props);
+ }
+
+
+ public void send(Event event) {
+ try {
+ String json = mapper.writeValueAsString(event);
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
+ producer.send(record, (metadata, e) -> {
+ if (e != null) {
+ LOG.error("Could not send event {}!", json, e);
+ }
+ else {
+ LOG.debug(
+ "{}: send event {} with offset {} to partition {}",
+ event.user,
+ event.id,
+ metadata.offset(),
+ metadata.partition());
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
+ }
+ }
+
+
+ @PreDestroy
+ public void stop(){
+ LOG.info("Closing the KafkaProducer...");
+ try {
+ producer.close(5, TimeUnit.SECONDS);
+ LOG.debug("Successfully closed the KafkaProducer");
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while closing the KafkaProducer!", e);
+ }
+ }
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+@Component
+public class OutboxService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OutboxService.class);
+
+
+ private final Map<String, String> state = new HashMap<>();
+ private final Map<String, DeferredResult> requests = new HashMap<>();
+
+ private long counter = 1;
+
+
+ public OutboxService() {}
+
+
+ public String bearbeiteVorgang(String vorgangId, String vbId, String data) {
+ if (vorgangId == null)
+ throw new IllegalArgumentException("vorgangId must not be null!");
+
+ // Fehler beim Sichern simulieren
+ Random r = new Random();
+ int i = r.nextInt(10);
+ if (i == 0)
+ throw new RuntimeException("FEHLER!!!!!!");
+
+ String result = vorgangId + "|vbId=" + vbId + "|" + counter++ + ", rand=" + i + ": " + data;
+
+ if (state.containsKey(vorgangId))
+ LOG.info("Bearbeite Vorgang {}: alt={}, neu={}", vorgangId, state.get(vorgangId), data);
+ else
+ LOG.info("Bearbeite Vorgang {}: neu={}", vorgangId, data);
+
+ process(vorgangId, result);
+ return result;
+ }
+
+ public synchronized void process(String vorgangId, DeferredResult result) {
+ String data = state.get(vorgangId);
+ if (data != null) {
+ result.setResult(ResponseEntity.ok(data));
+ }
+ else {
+ requests.put(vorgangId, result);
+ }
+ }
+
+ private synchronized void process(String vorgangId, String result) {
+ state.put(vorgangId, result);
+ DeferredResult request = requests.get(vorgangId);
+ if (request != null)
+ request.setResult(ResponseEntity.ok(result));
+ }
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import org.springframework.data.annotation.Id;
+
+import java.time.LocalDateTime;
+
+public class User {
+ @Id
+ Long id;
+ String username;
+ LocalDateTime created;
+ boolean loggedIn;
+
+ public User(String username, LocalDateTime created, boolean loggedIn) {
+ this.username = username;
+ this.created = created;
+ this.loggedIn = loggedIn;
+ }
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import org.springframework.data.jdbc.repository.query.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+public interface UserRepository extends CrudRepository<User, Long> {
+ @Query("select * from User u where u.username = :username")
+ User findByEmailAddress(@Param("email") String username);
+}
--- /dev/null
+package de.trion.kafka.outbox;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests {
+
+ @Test
+ public void contextLoads() {
+ }
+
+}