Moved postage of messages into a reusable standalone implementation
authorKai Moritz <kai@juplo.de>
Sun, 1 Nov 2020 11:58:20 +0000 (12:58 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 1 Nov 2020 18:02:18 +0000 (19:02 +0100)
* Renamed maven-module outbox into delivery
* Renamed app polling-outbox into outbox-polling-delivery
* Added new artifact outbox-postage as maven-module postage
* Added fully qualified names for the docker-images
* TODO: Move flyway-scriptes for outbox into module postage

27 files changed:
delivery/.dockerignore [new file with mode: 0644]
delivery/Dockerfile [new file with mode: 0644]
delivery/pom.xml [new file with mode: 0644]
delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java [new file with mode: 0644]
delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java [new file with mode: 0644]
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java [new file with mode: 0644]
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java [new file with mode: 0644]
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java [new file with mode: 0644]
delivery/src/main/resources/application.yml [new file with mode: 0644]
delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java [new file with mode: 0644]
docker-compose.yml
jdbc
outbox/.dockerignore [deleted file]
outbox/Dockerfile [deleted file]
outbox/pom.xml [deleted file]
outbox/src/main/java/de/juplo/kafka/outbox/Application.java [deleted file]
outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java [deleted file]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java [deleted file]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java [deleted file]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java [deleted file]
outbox/src/main/resources/application.yml [deleted file]
outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java [deleted file]
pom.xml
postage/pom.xml [new file with mode: 0644]
postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java [new file with mode: 0644]
postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java [new file with mode: 0644]
postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java [new file with mode: 0644]

diff --git a/delivery/.dockerignore b/delivery/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/delivery/Dockerfile b/delivery/Dockerfile
new file mode 100644 (file)
index 0000000..16ee25e
--- /dev/null
@@ -0,0 +1,5 @@
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
diff --git a/delivery/pom.xml b/delivery/pom.xml
new file mode 100644 (file)
index 0000000..975fbb5
--- /dev/null
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.3.2.RELEASE</version>
+    <relativePath/> <!-- lookup parent from repository -->
+  </parent>
+
+  <groupId>de.juplo.kafka.outbox</groupId>
+  <artifactId>outbox-delivery</artifactId>
+  <version>0.0.1-SNAPSHOT</version>
+  <name>Polling Outbox-Delivery</name>
+  <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
+
+  <properties>
+    <java.version>11</java.version>
+    <guava.version>30.0-jre</guava.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-jdbc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-json</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>io.fabric8</groupId>
+        <artifactId>docker-maven-plugin</artifactId>
+        <version>0.33.0</version>
+        <configuration>
+          <images>
+            <image>
+              <name>juplo/outbox:polling</name>
+            </image>
+          </images>
+        </configuration>
+        <executions>
+          <execution>
+             <id>build</id>
+             <phase>package</phase>
+             <goals>
+               <goal>build</goal>
+             </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java
new file mode 100644 (file)
index 0000000..6abd181
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.outbox.delivery;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableScheduling
+public class Application
+{
+  public static void main(String[] args) throws Exception
+  {
+    SpringApplication.run(Application.class, args);
+  }
+}
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..4e36aa4
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.outbox.delivery;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("de.juplo.kafka.outbox")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+  String bootstrapServers = "localhost:9092";
+  String topic = "outbox";
+}
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java
new file mode 100644 (file)
index 0000000..e48ac8e
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.outbox.delivery;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+
+@Data
+@Value
+@Builder
+public class OutboxItem
+{
+  private final Long sequenceNumber;
+  private final String key;
+  private final String value;
+}
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
new file mode 100644 (file)
index 0000000..c08cae7
--- /dev/null
@@ -0,0 +1,105 @@
+package de.juplo.kafka.outbox.delivery;
+
+import com.google.common.primitives.Longs;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+
+@Component
+public class OutboxProducer
+{
+  final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+
+  private final OutboxRepository repository;
+  private final KafkaProducer<String, String> producer;
+  private final String topic;
+
+  private long sequenceNumber = 0l;
+
+  public OutboxProducer(
+      ApplicationProperties properties,
+      OutboxRepository repository)
+  {
+    this.repository = repository;
+
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.bootstrapServers);
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
+
+    this.producer = new KafkaProducer<>(props);
+    this.topic = properties.topic;
+  }
+
+  @Scheduled(fixedDelay = 500)
+  public void poll()
+  {
+    List<OutboxItem> items;
+    do
+    {
+      items = repository.fetch(sequenceNumber);
+      LOG.debug("Polled {} new items", items.size());
+      for (OutboxItem item : items)
+        send(item);
+    }
+    while (items.size() > 0);
+  }
+
+  void send(OutboxItem item)
+  {
+    final ProducerRecord<String, String> record =
+        new ProducerRecord<>(topic, item.getKey(), item.getValue());
+
+    sequenceNumber = item.getSequenceNumber();
+    record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
+
+    producer.send(record, (metadata, e) ->
+    {
+      if (metadata != null)
+      {
+        int deleted = repository.delete(item.getSequenceNumber());
+        LOG.info(
+            "{}/{}:{} - {}:{}={} - deleted: {}",
+            metadata.topic(),
+            metadata.partition(),
+            metadata.offset(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            deleted);
+      }
+      else
+      {
+        // HANDLE ERROR
+        LOG.error(
+            "{}/{} - {}:{}={} -> ",
+            record.topic(),
+            record.partition(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            e);
+      }
+    });
+  }
+
+
+  @PreDestroy
+  public void close()
+  {
+    producer.close(Duration.ofSeconds(5));
+  }
+}
diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java
new file mode 100644 (file)
index 0000000..abf2d1d
--- /dev/null
@@ -0,0 +1,62 @@
+package de.juplo.kafka.outbox.delivery;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.List;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+  private static final String SQL_QUERY =
+      "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
+  private static final String SQL_UPDATE =
+      "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+  private static final String SQL_DELETE =
+      "DELETE FROM outbox WHERE id = :id";
+
+  private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+  public void save(String key, String value, ZonedDateTime issued)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("key", key);
+    parameters.addValue("value", value);
+    parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+    jdbcTemplate.update(SQL_UPDATE, parameters);
+  }
+
+  public int delete(Long id)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("id", id);
+    return jdbcTemplate.update(SQL_DELETE, parameters);
+  }
+
+  public List<OutboxItem> fetch(Long sequenceNumber)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("sequenceNumber", sequenceNumber);
+    return
+        jdbcTemplate.query(
+            SQL_QUERY,
+            parameters,
+            (resultSet, rowNumber) ->
+            {
+              return
+                  OutboxItem
+                      .builder()
+                      .sequenceNumber(resultSet.getLong(1))
+                      .key(resultSet.getString(2))
+                      .value(resultSet.getString(3))
+                      .build();
+            });
+  }
+}
diff --git a/delivery/src/main/resources/application.yml b/delivery/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..2a8502a
--- /dev/null
@@ -0,0 +1,53 @@
+management:
+  endpoints:
+    web:
+      exposure:
+        include: "*"
+
+spring:
+  flyway:
+    locations: classpath:db/migration/h2
+
+logging:
+  level:
+    de:
+      juplo:
+        kafka:
+          outbox: DEBUG
+
+---
+
+spring:
+  profiles: prod
+
+  datasource:
+    url: jdbc:postgresql://postgres:5432/outbox
+    username: outbox
+    password: outbox
+  flyway:
+    locations: classpath:db/migration/postgres
+
+de:
+  juplo:
+    kafka:
+      outbox:
+        bootstrap-servers: kafka:9093
+
+---
+
+spring:
+  profiles: dev
+
+  datasource:
+    url: jdbc:postgresql://localhost:5432/outbox
+    username: outbox
+    password: outbox
+  flyway:
+    locations: classpath:db/migration/postgres
+
+de:
+  juplo:
+    kafka:
+      outbox:
+        bootstrap-servers: localhost:9092
+
diff --git a/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java b/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java
new file mode 100644 (file)
index 0000000..b8f1834
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.outbox.delivery;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests
+{
+  @Test
+  public void contextLoads()
+  {
+  }
+}
index 0125f95..2cd2608 100644 (file)
@@ -23,16 +23,16 @@ services:
       - zookeeper
 
   jdbc:
-    image: jdbc:latest
+    image: juplo/jdbc:outbox
     ports:
       - 8080:8080
     environment:
-      spring.profiles.active: production
+      spring.profiles.active: prod
     depends_on:
       - postgres
 
   outbox:
-    image: polling-outbox:latest
+    image: juplo/outbox:polling
     environment:
       spring.profiles.active: prod
     depends_on:
diff --git a/jdbc b/jdbc
index 4d047ac..84663e6 160000 (submodule)
--- a/jdbc
+++ b/jdbc
@@ -1 +1 @@
-Subproject commit 4d047ac98c61563fbb9986b6ce1e4d6ead5a1e48
+Subproject commit 84663e6da682a4edf6e6705b9e5018720220a318
diff --git a/outbox/.dockerignore b/outbox/.dockerignore
deleted file mode 100644 (file)
index 1ad9963..0000000
+++ /dev/null
@@ -1,2 +0,0 @@
-*
-!target/*.jar
diff --git a/outbox/Dockerfile b/outbox/Dockerfile
deleted file mode 100644 (file)
index 16ee25e..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-FROM openjdk:11-jre
-VOLUME /tmp
-COPY target/*.jar /opt/app.jar
-ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
-CMD []
diff --git a/outbox/pom.xml b/outbox/pom.xml
deleted file mode 100644 (file)
index e8b48b6..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project
-    xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.springframework.boot</groupId>
-    <artifactId>spring-boot-starter-parent</artifactId>
-    <version>2.3.2.RELEASE</version>
-    <relativePath/> <!-- lookup parent from repository -->
-  </parent>
-
-  <groupId>de.juplo.kafka.outbox</groupId>
-  <artifactId>polling-outbox</artifactId>
-  <version>0.0.1-SNAPSHOT</version>
-  <name>polling-outbox</name>
-  <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
-
-  <properties>
-    <java.version>11</java.version>
-    <guava.version>30.0-jre</guava.version>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-data-jdbc</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-json</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.projectlombok</groupId>
-      <artifactId>lombok</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>${guava.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.postgresql</groupId>
-      <artifactId>postgresql</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.h2database</groupId>
-      <artifactId>h2</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.springframework.boot</groupId>
-        <artifactId>spring-boot-maven-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>io.fabric8</groupId>
-        <artifactId>docker-maven-plugin</artifactId>
-        <version>0.33.0</version>
-        <configuration>
-          <images>
-            <image>
-              <name>%a:%l</name>
-            </image>
-          </images>
-        </configuration>
-        <executions>
-          <execution>
-             <id>build</id>
-             <phase>package</phase>
-             <goals>
-               <goal>build</goal>
-             </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java
deleted file mode 100644 (file)
index 678a48a..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.outbox;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.scheduling.annotation.EnableScheduling;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
-@EnableScheduling
-public class Application
-{
-  public static void main(String[] args) throws Exception
-  {
-    SpringApplication.run(Application.class, args);
-  }
-}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java
deleted file mode 100644 (file)
index 1a5dca6..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka.outbox;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("de.juplo.kafka.outbox")
-@Getter
-@Setter
-public class ApplicationProperties
-{
-  String bootstrapServers = "localhost:9092";
-  String topic = "outbox";
-}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java
deleted file mode 100644 (file)
index 99deafa..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.outbox;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Value;
-
-
-@Data
-@Value
-@Builder
-public class OutboxItem
-{
-  private final Long sequenceNumber;
-  private final String key;
-  private final String value;
-}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java
deleted file mode 100644 (file)
index 30bef96..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
-package de.juplo.kafka.outbox;
-
-import com.google.common.primitives.Longs;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-
-
-@Component
-public class OutboxProducer
-{
-  final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
-
-
-  private final OutboxRepository repository;
-  private final KafkaProducer<String, String> producer;
-  private final String topic;
-
-  private long sequenceNumber = 0l;
-
-  public OutboxProducer(
-      ApplicationProperties properties,
-      OutboxRepository repository)
-  {
-    this.repository = repository;
-
-    Properties props = new Properties();
-    props.put("bootstrap.servers", properties.bootstrapServers);
-    props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
-
-    this.producer = new KafkaProducer<>(props);
-    this.topic = properties.topic;
-  }
-
-  @Scheduled(fixedDelay = 500)
-  public void poll()
-  {
-    List<OutboxItem> items;
-    do
-    {
-      items = repository.fetch(sequenceNumber);
-      LOG.debug("Polled {} new items", items.size());
-      for (OutboxItem item : items)
-        send(item);
-    }
-    while (items.size() > 0);
-  }
-
-  void send(OutboxItem item)
-  {
-    final ProducerRecord<String, String> record =
-        new ProducerRecord<>(topic, item.getKey(), item.getValue());
-
-    sequenceNumber = item.getSequenceNumber();
-    record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
-
-    producer.send(record, (metadata, e) ->
-    {
-      if (metadata != null)
-      {
-        int deleted = repository.delete(item.getSequenceNumber());
-        LOG.info(
-            "{}/{}:{} - {}:{}={} - deleted: {}",
-            metadata.topic(),
-            metadata.partition(),
-            metadata.offset(),
-            item.getSequenceNumber(),
-            record.key(),
-            record.value(),
-            deleted);
-      }
-      else
-      {
-        // HANDLE ERROR
-        LOG.error(
-            "{}/{} - {}:{}={} -> ",
-            record.topic(),
-            record.partition(),
-            item.getSequenceNumber(),
-            record.key(),
-            record.value(),
-            e);
-      }
-    });
-  }
-
-
-  @PreDestroy
-  public void close()
-  {
-    producer.close(Duration.ofSeconds(5));
-  }
-}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java
deleted file mode 100644 (file)
index 03a68ef..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-package de.juplo.kafka.outbox;
-
-import lombok.AllArgsConstructor;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
-import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.sql.Timestamp;
-import java.time.ZonedDateTime;
-import java.util.List;
-
-
-@Repository
-@AllArgsConstructor
-public class OutboxRepository
-{
-  private static final String SQL_QUERY =
-      "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
-  private static final String SQL_UPDATE =
-      "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
-  private static final String SQL_DELETE =
-      "DELETE FROM outbox WHERE id = :id";
-
-  private final NamedParameterJdbcTemplate jdbcTemplate;
-
-
-  public void save(String key, String value, ZonedDateTime issued)
-  {
-    MapSqlParameterSource parameters = new MapSqlParameterSource();
-    parameters.addValue("key", key);
-    parameters.addValue("value", value);
-    parameters.addValue("issued", Timestamp.from(issued.toInstant()));
-    jdbcTemplate.update(SQL_UPDATE, parameters);
-  }
-
-  public int delete(Long id)
-  {
-    MapSqlParameterSource parameters = new MapSqlParameterSource();
-    parameters.addValue("id", id);
-    return jdbcTemplate.update(SQL_DELETE, parameters);
-  }
-
-  public List<OutboxItem> fetch(Long sequenceNumber)
-  {
-    MapSqlParameterSource parameters = new MapSqlParameterSource();
-    parameters.addValue("sequenceNumber", sequenceNumber);
-    return
-        jdbcTemplate.query(
-            SQL_QUERY,
-            parameters,
-            (resultSet, rowNumber) ->
-            {
-              return
-                  OutboxItem
-                      .builder()
-                      .sequenceNumber(resultSet.getLong(1))
-                      .key(resultSet.getString(2))
-                      .value(resultSet.getString(3))
-                      .build();
-            });
-  }
-}
diff --git a/outbox/src/main/resources/application.yml b/outbox/src/main/resources/application.yml
deleted file mode 100644 (file)
index 2a8502a..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-management:
-  endpoints:
-    web:
-      exposure:
-        include: "*"
-
-spring:
-  flyway:
-    locations: classpath:db/migration/h2
-
-logging:
-  level:
-    de:
-      juplo:
-        kafka:
-          outbox: DEBUG
-
----
-
-spring:
-  profiles: prod
-
-  datasource:
-    url: jdbc:postgresql://postgres:5432/outbox
-    username: outbox
-    password: outbox
-  flyway:
-    locations: classpath:db/migration/postgres
-
-de:
-  juplo:
-    kafka:
-      outbox:
-        bootstrap-servers: kafka:9093
-
----
-
-spring:
-  profiles: dev
-
-  datasource:
-    url: jdbc:postgresql://localhost:5432/outbox
-    username: outbox
-    password: outbox
-  flyway:
-    locations: classpath:db/migration/postgres
-
-de:
-  juplo:
-    kafka:
-      outbox:
-        bootstrap-servers: localhost:9092
-
diff --git a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java
deleted file mode 100644 (file)
index cb5bc43..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.outbox;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest
-public class ApplicationTests
-{
-  @Test
-  public void contextLoads()
-  {
-  }
-}
diff --git a/pom.xml b/pom.xml
index a270735..f54749e 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -14,8 +14,9 @@
   <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
 
   <modules>
+    <module>postage</module>
     <module>jdbc</module>
-    <module>outbox</module>
+    <module>delivery</module>
   </modules>
 
 </project>
diff --git a/postage/pom.xml b/postage/pom.xml
new file mode 100644 (file)
index 0000000..1a3ac86
--- /dev/null
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.3.2.RELEASE</version>
+    <relativePath/> <!-- lookup parent from repository -->
+  </parent>
+
+  <groupId>de.juplo.kafka.outbox</groupId>
+  <artifactId>outbox-postage</artifactId>
+  <version>0.0.1-SNAPSHOT</version>
+  <name>outbox-postage</name>
+  <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
+
+  <properties>
+    <java.version>11</java.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-jdbc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-json</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.flywaydb</groupId>
+      <artifactId>flyway-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java
new file mode 100644 (file)
index 0000000..6c87b12
--- /dev/null
@@ -0,0 +1,30 @@
+package de.juplo.kafka.outbox.postage;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.springframework.context.ApplicationEvent;
+
+import java.time.ZonedDateTime;
+
+
+@ToString
+@EqualsAndHashCode
+public class OutboxEvent extends ApplicationEvent
+{
+  @Getter
+  private final String key;
+  @Getter
+  private final Object value;
+  @Getter
+  private final ZonedDateTime time;
+
+
+  public OutboxEvent(Object source, String key, Object value, ZonedDateTime time)
+  {
+    super(source);
+    this.key = key;
+    this.value = value;
+    this.time = time;
+  }
+}
diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java
new file mode 100644 (file)
index 0000000..abb350a
--- /dev/null
@@ -0,0 +1,34 @@
+package de.juplo.kafka.outbox.postage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+
+@Component
+@AllArgsConstructor
+public class OutboxListener
+{
+  private final OutboxRepository repository;
+  private final ObjectMapper mapper;
+
+
+  @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+  public void onUserEvent(OutboxEvent event)
+  {
+    try
+    {
+      repository.save(
+          event.getKey(),
+          mapper.writeValueAsString(event.getValue()),
+          event.getTime());
+    }
+    catch (JsonProcessingException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java
new file mode 100644 (file)
index 0000000..50fc301
--- /dev/null
@@ -0,0 +1,30 @@
+package de.juplo.kafka.outbox.postage;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+  private static final String SQL_UPDATE =
+      "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+
+  private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+  public void save(String key, String value, ZonedDateTime issued)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("key", key);
+    parameters.addValue("value", value);
+    parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+    jdbcTemplate.update(SQL_UPDATE, parameters);
+  }
+}