Added a simple implementation of the polling outbox pattern
authorKai Moritz <kai@juplo.de>
Sat, 31 Oct 2020 16:03:24 +0000 (17:03 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 1 Nov 2020 12:20:27 +0000 (13:20 +0100)
12 files changed:
docker-compose.yml
outbox/.dockerignore [new file with mode: 0644]
outbox/Dockerfile [new file with mode: 0644]
outbox/pom.xml [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/Application.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java [new file with mode: 0644]
outbox/src/main/resources/application.yml [new file with mode: 0644]
outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java [new file with mode: 0644]
pom.xml

index bb11901..0125f95 100644 (file)
@@ -31,6 +31,14 @@ services:
     depends_on:
       - postgres
 
+  outbox:
+    image: polling-outbox:latest
+    environment:
+      spring.profiles.active: prod
+    depends_on:
+      - postgres
+      - kafka
+
 
   postgres:
     image: postgres:13
diff --git a/outbox/.dockerignore b/outbox/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/outbox/Dockerfile b/outbox/Dockerfile
new file mode 100644 (file)
index 0000000..16ee25e
--- /dev/null
@@ -0,0 +1,5 @@
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
diff --git a/outbox/pom.xml b/outbox/pom.xml
new file mode 100644 (file)
index 0000000..f904537
--- /dev/null
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.3.2.RELEASE</version>
+    <relativePath/> <!-- lookup parent from repository -->
+  </parent>
+
+  <groupId>de.juplo.kafka.outbox</groupId>
+  <artifactId>polling-outbox</artifactId>
+  <version>0.0.1-SNAPSHOT</version>
+  <name>polling-outbox</name>
+  <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
+
+  <properties>
+    <java.version>11</java.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-jdbc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-json</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>io.fabric8</groupId>
+        <artifactId>docker-maven-plugin</artifactId>
+        <version>0.33.0</version>
+        <configuration>
+          <images>
+            <image>
+              <name>%a:%l</name>
+            </image>
+          </images>
+        </configuration>
+        <executions>
+          <execution>
+             <id>build</id>
+             <phase>package</phase>
+             <goals>
+               <goal>build</goal>
+             </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java
new file mode 100644 (file)
index 0000000..a63d714
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka.outbox;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+import java.util.concurrent.CountDownLatch;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
+public class Application
+{
+  public static void main(String[] args) throws Exception
+  {
+    SpringApplication.run(Application.class, args);
+
+    final CountDownLatch closeLatch = new CountDownLatch(1);
+
+    Runtime
+        .getRuntime()
+        .addShutdownHook(new Thread()
+        {
+          @Override
+          public void run()
+          {
+            log.info("Closing application...");
+            closeLatch.countDown();
+          }
+        });
+
+    closeLatch.await();
+  }
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..1a5dca6
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.outbox;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("de.juplo.kafka.outbox")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+  String bootstrapServers = "localhost:9092";
+  String topic = "outbox";
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java
new file mode 100644 (file)
index 0000000..99deafa
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.outbox;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+
+@Data
+@Value
+@Builder
+public class OutboxItem
+{
+  private final Long sequenceNumber;
+  private final String key;
+  private final String value;
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java
new file mode 100644 (file)
index 0000000..627ca05
--- /dev/null
@@ -0,0 +1,107 @@
+package de.juplo.kafka.outbox;
+
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+
+@Component
+public class OutboxProducer
+{
+  final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+
+  private final OutboxRepository repository;
+  private final KafkaProducer<String, String> producer;
+  private final String topic;
+
+  private long sequenceNumber = 0l;
+
+  public OutboxProducer(
+      ApplicationProperties properties,
+      OutboxRepository repository)
+  {
+    this.repository = repository;
+
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.bootstrapServers);
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
+
+    this.producer = new KafkaProducer<>(props);
+    this.topic = properties.topic;
+  }
+
+  @Scheduled(fixedDelay = 500)
+  public void poll()
+  {
+    List<OutboxItem> items;
+    do
+    {
+      items = repository.fetch(sequenceNumber);
+      LOG.debug("Polled {} new items", items.size());
+      for (OutboxItem item : items)
+        send(item);
+    }
+    while (items.size() > 0);
+  }
+
+  void send(OutboxItem item)
+  {
+    final ProducerRecord<String, String> record =
+        new ProducerRecord<>(topic, item.getKey(), item.getValue());
+
+    sequenceNumber = item.getSequenceNumber();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+    buffer.putLong(item.getSequenceNumber());
+    record.headers().add("SEQ#", buffer.array());
+
+    producer.send(record, (metadata, e) ->
+    {
+      if (metadata != null)
+      {
+        int deleted = repository.delete(item.getSequenceNumber());
+        LOG.info(
+            "{}/{}:{} - {}:{}={} - deleted: {}",
+            metadata.topic(),
+            metadata.partition(),
+            metadata.offset(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            deleted);
+      }
+      else
+      {
+        // HANDLE ERROR
+        LOG.error(
+            "{}/{} - {}:{}={} -> ",
+            record.topic(),
+            record.partition(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            e);
+      }
+    });
+  }
+
+
+  @PreDestroy
+  public void close()
+  {
+    producer.close(Duration.ofSeconds(5));
+  }
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java
new file mode 100644 (file)
index 0000000..03a68ef
--- /dev/null
@@ -0,0 +1,62 @@
+package de.juplo.kafka.outbox;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.List;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+  private static final String SQL_QUERY =
+      "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
+  private static final String SQL_UPDATE =
+      "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+  private static final String SQL_DELETE =
+      "DELETE FROM outbox WHERE id = :id";
+
+  private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+  public void save(String key, String value, ZonedDateTime issued)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("key", key);
+    parameters.addValue("value", value);
+    parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+    jdbcTemplate.update(SQL_UPDATE, parameters);
+  }
+
+  public int delete(Long id)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("id", id);
+    return jdbcTemplate.update(SQL_DELETE, parameters);
+  }
+
+  public List<OutboxItem> fetch(Long sequenceNumber)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("sequenceNumber", sequenceNumber);
+    return
+        jdbcTemplate.query(
+            SQL_QUERY,
+            parameters,
+            (resultSet, rowNumber) ->
+            {
+              return
+                  OutboxItem
+                      .builder()
+                      .sequenceNumber(resultSet.getLong(1))
+                      .key(resultSet.getString(2))
+                      .value(resultSet.getString(3))
+                      .build();
+            });
+  }
+}
diff --git a/outbox/src/main/resources/application.yml b/outbox/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..2a8502a
--- /dev/null
@@ -0,0 +1,53 @@
+management:
+  endpoints:
+    web:
+      exposure:
+        include: "*"
+
+spring:
+  flyway:
+    locations: classpath:db/migration/h2
+
+logging:
+  level:
+    de:
+      juplo:
+        kafka:
+          outbox: DEBUG
+
+---
+
+spring:
+  profiles: prod
+
+  datasource:
+    url: jdbc:postgresql://postgres:5432/outbox
+    username: outbox
+    password: outbox
+  flyway:
+    locations: classpath:db/migration/postgres
+
+de:
+  juplo:
+    kafka:
+      outbox:
+        bootstrap-servers: kafka:9093
+
+---
+
+spring:
+  profiles: dev
+
+  datasource:
+    url: jdbc:postgresql://localhost:5432/outbox
+    username: outbox
+    password: outbox
+  flyway:
+    locations: classpath:db/migration/postgres
+
+de:
+  juplo:
+    kafka:
+      outbox:
+        bootstrap-servers: localhost:9092
+
diff --git a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java
new file mode 100644 (file)
index 0000000..cb5bc43
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.outbox;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests
+{
+  @Test
+  public void contextLoads()
+  {
+  }
+}
diff --git a/pom.xml b/pom.xml
index d11a4d7..a270735 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,6 +15,7 @@
 
   <modules>
     <module>jdbc</module>
+    <module>outbox</module>
   </modules>
 
 </project>