depends_on:
- postgres
+ outbox:
+ image: polling-outbox:latest
+ environment:
+ spring.profiles.active: prod
+ depends_on:
+ - postgres
+ - kafka
+
postgres:
image: postgres:13
--- /dev/null
+*
+!target/*.jar
--- /dev/null
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.3.2.RELEASE</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+
+ <groupId>de.juplo.kafka.outbox</groupId>
+ <artifactId>polling-outbox</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>polling-outbox</name>
+ <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
+
+ <properties>
+ <java.version>11</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ <configuration>
+ <images>
+ <image>
+ <name>%a:%l</name>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+import java.util.concurrent.CountDownLatch;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
+public class Application
+{
+ public static void main(String[] args) throws Exception
+ {
+ SpringApplication.run(Application.class, args);
+
+ final CountDownLatch closeLatch = new CountDownLatch(1);
+
+ Runtime
+ .getRuntime()
+ .addShutdownHook(new Thread()
+ {
+ @Override
+ public void run()
+ {
+ log.info("Closing application...");
+ closeLatch.countDown();
+ }
+ });
+
+ closeLatch.await();
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("de.juplo.kafka.outbox")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+ String bootstrapServers = "localhost:9092";
+ String topic = "outbox";
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+
+@Data
+@Value
+@Builder
+public class OutboxItem
+{
+ private final Long sequenceNumber;
+ private final String key;
+ private final String value;
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+
+@Component
+public class OutboxProducer
+{
+ final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+
+ private final OutboxRepository repository;
+ private final KafkaProducer<String, String> producer;
+ private final String topic;
+
+ private long sequenceNumber = 0l;
+
+ public OutboxProducer(
+ ApplicationProperties properties,
+ OutboxRepository repository)
+ {
+ this.repository = repository;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.bootstrapServers);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ this.producer = new KafkaProducer<>(props);
+ this.topic = properties.topic;
+ }
+
+ @Scheduled(fixedDelay = 500)
+ public void poll()
+ {
+ List<OutboxItem> items;
+ do
+ {
+ items = repository.fetch(sequenceNumber);
+ LOG.debug("Polled {} new items", items.size());
+ for (OutboxItem item : items)
+ send(item);
+ }
+ while (items.size() > 0);
+ }
+
+ void send(OutboxItem item)
+ {
+ final ProducerRecord<String, String> record =
+ new ProducerRecord<>(topic, item.getKey(), item.getValue());
+
+ sequenceNumber = item.getSequenceNumber();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(item.getSequenceNumber());
+ record.headers().add("SEQ#", buffer.array());
+
+ producer.send(record, (metadata, e) ->
+ {
+ if (metadata != null)
+ {
+ int deleted = repository.delete(item.getSequenceNumber());
+ LOG.info(
+ "{}/{}:{} - {}:{}={} - deleted: {}",
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset(),
+ item.getSequenceNumber(),
+ record.key(),
+ record.value(),
+ deleted);
+ }
+ else
+ {
+ // HANDLE ERROR
+ LOG.error(
+ "{}/{} - {}:{}={} -> ",
+ record.topic(),
+ record.partition(),
+ item.getSequenceNumber(),
+ record.key(),
+ record.value(),
+ e);
+ }
+ });
+ }
+
+
+ @PreDestroy
+ public void close()
+ {
+ producer.close(Duration.ofSeconds(5));
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.List;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+ private static final String SQL_QUERY =
+ "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
+ private static final String SQL_UPDATE =
+ "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+ private static final String SQL_DELETE =
+ "DELETE FROM outbox WHERE id = :id";
+
+ private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+ public void save(String key, String value, ZonedDateTime issued)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("key", key);
+ parameters.addValue("value", value);
+ parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+ jdbcTemplate.update(SQL_UPDATE, parameters);
+ }
+
+ public int delete(Long id)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("id", id);
+ return jdbcTemplate.update(SQL_DELETE, parameters);
+ }
+
+ public List<OutboxItem> fetch(Long sequenceNumber)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("sequenceNumber", sequenceNumber);
+ return
+ jdbcTemplate.query(
+ SQL_QUERY,
+ parameters,
+ (resultSet, rowNumber) ->
+ {
+ return
+ OutboxItem
+ .builder()
+ .sequenceNumber(resultSet.getLong(1))
+ .key(resultSet.getString(2))
+ .value(resultSet.getString(3))
+ .build();
+ });
+ }
+}
--- /dev/null
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+
+spring:
+ flyway:
+ locations: classpath:db/migration/h2
+
+logging:
+ level:
+ de:
+ juplo:
+ kafka:
+ outbox: DEBUG
+
+---
+
+spring:
+ profiles: prod
+
+ datasource:
+ url: jdbc:postgresql://postgres:5432/outbox
+ username: outbox
+ password: outbox
+ flyway:
+ locations: classpath:db/migration/postgres
+
+de:
+ juplo:
+ kafka:
+ outbox:
+ bootstrap-servers: kafka:9093
+
+---
+
+spring:
+ profiles: dev
+
+ datasource:
+ url: jdbc:postgresql://localhost:5432/outbox
+ username: outbox
+ password: outbox
+ flyway:
+ locations: classpath:db/migration/postgres
+
+de:
+ juplo:
+ kafka:
+ outbox:
+ bootstrap-servers: localhost:9092
+
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests
+{
+ @Test
+ public void contextLoads()
+ {
+ }
+}
<modules>
<module>jdbc</module>
+ <module>outbox</module>
</modules>
</project>