WIP
authorKai Moritz <kai@juplo.de>
Fri, 10 Jul 2020 12:54:21 +0000 (14:54 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 10 Jul 2020 12:54:21 +0000 (14:54 +0200)
12 files changed:
.gitignore [new file with mode: 0644]
pom.xml [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/Application.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/ApplicationProperties.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/Event.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/OutboxConsumer.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/OutboxController.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/OutboxProducer.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/OutboxService.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/User.java [new file with mode: 0644]
src/main/java/de/trion/kafka/outbox/UserRepository.java [new file with mode: 0644]
src/test/java/de/trion/kafka/outbox/ApplicationTests.java [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..c507849
--- /dev/null
@@ -0,0 +1,2 @@
+target
+.idea
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..ae3fdf4
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.1.5.RELEASE</version>
+        <relativePath/> <!-- lookup parent from repository -->
+    </parent>
+    <groupId>de.trion.kafka</groupId>
+    <artifactId>outbox</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>Polling-Outbox-Pattern</name>
+    <description>Implementierung des Outbox-Patterns auf Basis von JDBC</description>
+
+    <properties>
+        <java.version>1.8</java.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jdbc</artifactId>
+        </dependency>
+        <dependency>
+                       <groupId>com.h2database</groupId>
+                       <artifactId>h2</artifactId>
+                       <scope>runtime</scope>
+               </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-json</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+              <groupId>io.fabric8</groupId>
+              <artifactId>docker-maven-plugin</artifactId>
+              <version>0.33.0</version>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/src/main/java/de/trion/kafka/outbox/Application.java b/src/main/java/de/trion/kafka/outbox/Application.java
new file mode 100644 (file)
index 0000000..3e0a723
--- /dev/null
@@ -0,0 +1,49 @@
+package de.trion.kafka.outbox;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application {
+
+    @Autowired
+    ApplicationProperties properties;
+
+
+    @Bean
+    public String bootstrapServers() { return properties.bootstrapServers; }
+
+    @Bean
+    public String topic() {
+        return properties.topic;
+    }
+
+    @Bean
+    public String consumerGroup() {
+        return properties.consumerGroup;
+    }
+
+    @Bean
+    public WebMvcConfigurer corsConfigurer() {
+        return new WebMvcConfigurer() {
+            @Override
+            public void addCorsMappings(CorsRegistry registry) {
+                registry
+                        .addMapping("/**")
+                        .allowedOrigins("http://localhost:4200");
+            }
+        };
+    }
+
+
+    public static void main(String[] args) {
+        SpringApplication.run(Application.class, args);
+    }
+
+}
diff --git a/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java b/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..7ba4c06
--- /dev/null
@@ -0,0 +1,35 @@
+package de.trion.kafka.outbox;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("outbox.polling")
+public class ApplicationProperties {
+    public String bootstrapServers = "localhost:9092";
+    public String topic = "outbox-polling";
+    public String consumerGroup = "polling";
+
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+}
diff --git a/src/main/java/de/trion/kafka/outbox/Event.java b/src/main/java/de/trion/kafka/outbox/Event.java
new file mode 100644 (file)
index 0000000..ca228e0
--- /dev/null
@@ -0,0 +1,9 @@
+package de.trion.kafka.outbox;
+
+public class Event {
+    public enum Type { CREATED, LOGIN, LOGOUT, DELETED }
+
+    Long id;
+    Type type;
+    String user;
+}
diff --git a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java
new file mode 100644 (file)
index 0000000..7bb6d4f
--- /dev/null
@@ -0,0 +1,174 @@
+package de.trion.kafka.outbox;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.lvm.tx.Event;
+import de.lvm.tx.Command;
+import de.lvm.tx.Command.Action;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import javax.swing.*;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static de.lvm.tx.Event.Type.*;
+
+@Component
+public class OutboxConsumer implements ApplicationRunner, Runnable {
+
+    private final static Logger LOG = LoggerFactory.getLogger(OutboxConsumer.class);
+
+    private final OutboxService service;
+    private final OutboxProducer sender;
+    private final ObjectMapper mapper;
+    private final String topic;
+    private final KafkaConsumer<Long, String> consumer;
+    private final Thread thread;
+
+    private long internalState = 1;
+
+
+    public OutboxConsumer(
+            OutboxService service,
+            OutboxProducer sender,
+            ObjectMapper mapper,
+            String bootstrapServers,
+            String consumerGroup,
+            String topic) {
+
+        this.service = service;
+        this.sender = sender;
+        this.mapper = mapper;
+        this.topic = topic;
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", bootstrapServers);
+        props.put("group.id", consumerGroup);
+        props.put("auto.commit.interval.ms", 15000);
+        props.put("metadata.max.age.ms", 1000);
+        props.put("key.deserializer", LongDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+        consumer = new KafkaConsumer<>(props);
+
+        thread = new Thread(this);
+    }
+
+
+    @Override
+    public void run() {
+        try
+        {
+            LOG.info("Subscribing to topic " + topic);
+            consumer.subscribe(Arrays.asList(topic));
+
+            while (true)
+            {
+                ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
+                for (ConsumerRecord<Long, String> record : records) {
+                    byte code = record.headers().lastHeader("messageType").value()[0];
+                    Action action = Action.from(code);
+
+                    if (action == null)
+                    {
+                        LOG.debug("Ignoring unknown action {} for {}", code, record.value());
+                        continue;
+                    }
+
+                    switch(action) {
+                        case SAVE_DLZ:
+                            dlzSaveReceived(toCommand(record.value()));
+                            continue;
+                        default:
+                            LOG.debug("Ignoring message {}", record.value());
+                    }
+                    byte[] bytes = record.headers().lastHeader("messageType").value();
+                    String type = new String(bytes, StandardCharsets.UTF_8);
+
+                    if (type.endsWith("DlzAction")) {
+                        dlzSaveReceived(toCommand(record.value()));
+                        continue;
+                    }
+
+                    LOG.debug("Ignoring command {}", record.value());
+                }
+            }
+        }
+        catch (WakeupException e) {}
+        catch (Exception e) {
+            LOG.error("Unexpected exception!", e);
+        }
+        finally
+        {
+            LOG.info("Closing the KafkaConsumer...");
+            try {
+                consumer.close(Duration.ofSeconds(5));
+                LOG.debug("Successfully closed the KafkaConsumer");
+            }
+            catch (Exception e) {
+                LOG.warn("Exception while closing the KafkaConsumer!", e);
+            }
+        }
+    }
+
+    public Command toCommand(String message) throws IOException {
+        Command command = mapper.readValue(message, Command.class);
+        LOG.info("{}: {}", command.getAction(), command.getVorgangId());
+        return command;
+    }
+
+    public void dlzSaveReceived(Command command) throws InterruptedException {
+        try
+        {
+            String result =
+                    service.bearbeiteVorgang(
+                            command.getVorgangId(),
+                            command.getVbId(),
+                            command.getData());
+            reply(command, result);
+        }
+        catch (Exception e) {
+            LOG.error("Exception during processing!", e);
+        }
+    }
+
+    public void reply(Command command, String message) {
+        String vorgangId = command.getVorgangId();
+        String vbId = command.getVbId();
+        Event event = new Event(DLZ_SAVED, vorgangId, vbId);
+        event.getZustand().put(Event.DLZ, message);
+        sender.send(event);
+    }
+
+
+    @Override
+    public void run(ApplicationArguments args) {
+        thread.start();
+        try {
+            thread.join();
+            LOG.info("Successfully joined the consumer-thread");
+        }
+        catch (InterruptedException e) {
+            LOG.info("Main-thread was interrupted while joining the consumer-thread");
+        }
+    }
+
+    @PreDestroy
+    public void stop()
+    {
+        LOG.info("Stopping the KafkaConsumer...");
+        consumer.wakeup();
+    }
+}
diff --git a/src/main/java/de/trion/kafka/outbox/OutboxController.java b/src/main/java/de/trion/kafka/outbox/OutboxController.java
new file mode 100644 (file)
index 0000000..934ca1f
--- /dev/null
@@ -0,0 +1,27 @@
+package de.trion.kafka.outbox;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+
+@RestController
+public class OutboxController {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OutboxController.class);
+
+
+    private final OutboxService service;
+
+
+    public OutboxController(OutboxService service) {
+        this.service = service;
+    }
+
+
+    @PostMapping("/create")
+    public ResponseEntity<Void> getVorgang(@RequestBody String user) {
+    }
+}
diff --git a/src/main/java/de/trion/kafka/outbox/OutboxProducer.java b/src/main/java/de/trion/kafka/outbox/OutboxProducer.java
new file mode 100644 (file)
index 0000000..99adae9
--- /dev/null
@@ -0,0 +1,72 @@
+package de.trion.kafka.outbox;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class OutboxProducer {
+
+    private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+    private final ObjectMapper mapper;
+    private final String topic;
+    private final KafkaProducer<String, String> producer;
+
+
+    public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
+        this.mapper = mapper;
+        this.topic = topic;
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", bootstrapServers);
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        producer = new KafkaProducer<>(props);
+    }
+
+
+    public void send(Event event) {
+        try {
+            String json = mapper.writeValueAsString(event);
+            ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
+            producer.send(record, (metadata, e) -> {
+                if (e != null) {
+                    LOG.error("Could not send event {}!", json, e);
+                }
+                else {
+                    LOG.debug(
+                            "{}: send event {} with offset {} to partition {}",
+                            event.user,
+                            event.id,
+                            metadata.offset(),
+                            metadata.partition());
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
+        }
+    }
+
+
+    @PreDestroy
+    public void stop(){
+        LOG.info("Closing the KafkaProducer...");
+        try {
+            producer.close(5, TimeUnit.SECONDS);
+            LOG.debug("Successfully closed the KafkaProducer");
+        }
+        catch (Exception e) {
+            LOG.warn("Exception while closing the KafkaProducer!", e);
+        }
+    }
+}
diff --git a/src/main/java/de/trion/kafka/outbox/OutboxService.java b/src/main/java/de/trion/kafka/outbox/OutboxService.java
new file mode 100644 (file)
index 0000000..e003839
--- /dev/null
@@ -0,0 +1,65 @@
+package de.trion.kafka.outbox;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+@Component
+public class OutboxService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OutboxService.class);
+
+
+    private final Map<String, String> state = new HashMap<>();
+    private final Map<String, DeferredResult> requests = new HashMap<>();
+
+    private long counter = 1;
+
+
+    public OutboxService() {}
+
+
+    public String bearbeiteVorgang(String vorgangId, String vbId, String data) {
+        if (vorgangId == null)
+            throw new IllegalArgumentException("vorgangId must not be null!");
+
+        // Fehler beim Sichern simulieren
+        Random r = new Random();
+        int i = r.nextInt(10);
+        if (i == 0)
+            throw new RuntimeException("FEHLER!!!!!!");
+
+        String result = vorgangId + "|vbId=" + vbId + "|" + counter++ + ", rand=" + i + ": " + data;
+
+        if (state.containsKey(vorgangId))
+            LOG.info("Bearbeite Vorgang {}: alt={}, neu={}", vorgangId, state.get(vorgangId), data);
+        else
+            LOG.info("Bearbeite Vorgang {}: neu={}", vorgangId, data);
+
+        process(vorgangId, result);
+        return result;
+    }
+
+    public synchronized void process(String vorgangId, DeferredResult result) {
+        String data = state.get(vorgangId);
+        if (data != null) {
+            result.setResult(ResponseEntity.ok(data));
+        }
+        else {
+            requests.put(vorgangId, result);
+        }
+    }
+
+    private synchronized void process(String vorgangId, String result) {
+        state.put(vorgangId, result);
+        DeferredResult request = requests.get(vorgangId);
+        if (request != null)
+          request.setResult(ResponseEntity.ok(result));
+    }
+}
diff --git a/src/main/java/de/trion/kafka/outbox/User.java b/src/main/java/de/trion/kafka/outbox/User.java
new file mode 100644 (file)
index 0000000..ffb29bf
--- /dev/null
@@ -0,0 +1,19 @@
+package de.trion.kafka.outbox;
+
+import org.springframework.data.annotation.Id;
+
+import java.time.LocalDateTime;
+
+public class User {
+    @Id
+    Long id;
+    String username;
+    LocalDateTime created;
+    boolean loggedIn;
+
+    public User(String username, LocalDateTime created, boolean loggedIn) {
+        this.username = username;
+        this.created = created;
+        this.loggedIn = loggedIn;
+    }
+}
diff --git a/src/main/java/de/trion/kafka/outbox/UserRepository.java b/src/main/java/de/trion/kafka/outbox/UserRepository.java
new file mode 100644 (file)
index 0000000..a12c2e7
--- /dev/null
@@ -0,0 +1,11 @@
+package de.trion.kafka.outbox;
+
+import org.springframework.data.jdbc.repository.query.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+public interface UserRepository extends CrudRepository<User, Long> {
+    @Query("select * from User u where u.username = :username")
+    User findByEmailAddress(@Param("email") String username);
+}
diff --git a/src/test/java/de/trion/kafka/outbox/ApplicationTests.java b/src/test/java/de/trion/kafka/outbox/ApplicationTests.java
new file mode 100644 (file)
index 0000000..0962964
--- /dev/null
@@ -0,0 +1,16 @@
+package de.trion.kafka.outbox;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests {
+
+    @Test
+    public void contextLoads() {
+    }
+
+}