--- /dev/null
+*
+!target/*.jar
--- /dev/null
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.3.2.RELEASE</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+
+ <groupId>de.juplo.kafka.outbox</groupId>
+ <artifactId>outbox-delivery</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>Polling Outbox-Delivery</name>
+ <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
+
+ <properties>
+ <java.version>11</java.version>
+ <guava.version>30.0-jre</guava.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.33.0</version>
+ <configuration>
+ <images>
+ <image>
+ <name>juplo/outbox:polling</name>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.kafka.outbox.delivery;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableScheduling
+public class Application
+{
+ public static void main(String[] args) throws Exception
+ {
+ SpringApplication.run(Application.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.delivery;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("de.juplo.kafka.outbox")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+ String bootstrapServers = "localhost:9092";
+ String topic = "outbox";
+}
--- /dev/null
+package de.juplo.kafka.outbox.delivery;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+
+@Data
+@Value
+@Builder
+public class OutboxItem
+{
+ private final Long sequenceNumber;
+ private final String key;
+ private final String value;
+}
--- /dev/null
+package de.juplo.kafka.outbox.delivery;
+
+import com.google.common.primitives.Longs;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+
+@Component
+public class OutboxProducer
+{
+ final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+
+ private final OutboxRepository repository;
+ private final KafkaProducer<String, String> producer;
+ private final String topic;
+
+ private long sequenceNumber = 0l;
+
+ public OutboxProducer(
+ ApplicationProperties properties,
+ OutboxRepository repository)
+ {
+ this.repository = repository;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.bootstrapServers);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ this.producer = new KafkaProducer<>(props);
+ this.topic = properties.topic;
+ }
+
+ @Scheduled(fixedDelay = 500)
+ public void poll()
+ {
+ List<OutboxItem> items;
+ do
+ {
+ items = repository.fetch(sequenceNumber);
+ LOG.debug("Polled {} new items", items.size());
+ for (OutboxItem item : items)
+ send(item);
+ }
+ while (items.size() > 0);
+ }
+
+ void send(OutboxItem item)
+ {
+ final ProducerRecord<String, String> record =
+ new ProducerRecord<>(topic, item.getKey(), item.getValue());
+
+ sequenceNumber = item.getSequenceNumber();
+ record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
+
+ producer.send(record, (metadata, e) ->
+ {
+ if (metadata != null)
+ {
+ int deleted = repository.delete(item.getSequenceNumber());
+ LOG.info(
+ "{}/{}:{} - {}:{}={} - deleted: {}",
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset(),
+ item.getSequenceNumber(),
+ record.key(),
+ record.value(),
+ deleted);
+ }
+ else
+ {
+ // HANDLE ERROR
+ LOG.error(
+ "{}/{} - {}:{}={} -> ",
+ record.topic(),
+ record.partition(),
+ item.getSequenceNumber(),
+ record.key(),
+ record.value(),
+ e);
+ }
+ });
+ }
+
+
+ @PreDestroy
+ public void close()
+ {
+ producer.close(Duration.ofSeconds(5));
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.delivery;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.List;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+ private static final String SQL_QUERY =
+ "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
+ private static final String SQL_UPDATE =
+ "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+ private static final String SQL_DELETE =
+ "DELETE FROM outbox WHERE id = :id";
+
+ private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+ public void save(String key, String value, ZonedDateTime issued)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("key", key);
+ parameters.addValue("value", value);
+ parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+ jdbcTemplate.update(SQL_UPDATE, parameters);
+ }
+
+ public int delete(Long id)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("id", id);
+ return jdbcTemplate.update(SQL_DELETE, parameters);
+ }
+
+ public List<OutboxItem> fetch(Long sequenceNumber)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("sequenceNumber", sequenceNumber);
+ return
+ jdbcTemplate.query(
+ SQL_QUERY,
+ parameters,
+ (resultSet, rowNumber) ->
+ {
+ return
+ OutboxItem
+ .builder()
+ .sequenceNumber(resultSet.getLong(1))
+ .key(resultSet.getString(2))
+ .value(resultSet.getString(3))
+ .build();
+ });
+ }
+}
--- /dev/null
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+
+spring:
+ flyway:
+ locations: classpath:db/migration/h2
+
+logging:
+ level:
+ de:
+ juplo:
+ kafka:
+ outbox: DEBUG
+
+---
+
+spring:
+ profiles: prod
+
+ datasource:
+ url: jdbc:postgresql://postgres:5432/outbox
+ username: outbox
+ password: outbox
+ flyway:
+ locations: classpath:db/migration/postgres
+
+de:
+ juplo:
+ kafka:
+ outbox:
+ bootstrap-servers: kafka:9093
+
+---
+
+spring:
+ profiles: dev
+
+ datasource:
+ url: jdbc:postgresql://localhost:5432/outbox
+ username: outbox
+ password: outbox
+ flyway:
+ locations: classpath:db/migration/postgres
+
+de:
+ juplo:
+ kafka:
+ outbox:
+ bootstrap-servers: localhost:9092
+
--- /dev/null
+package de.juplo.kafka.outbox.delivery;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests
+{
+ @Test
+ public void contextLoads()
+ {
+ }
+}
- zookeeper
jdbc:
- image: jdbc:latest
+ image: juplo/jdbc:outbox
ports:
- 8080:8080
environment:
- spring.profiles.active: production
+ spring.profiles.active: prod
depends_on:
- postgres
outbox:
- image: polling-outbox:latest
+ image: juplo/outbox:polling
environment:
spring.profiles.active: prod
depends_on:
-Subproject commit 4d047ac98c61563fbb9986b6ce1e4d6ead5a1e48
+Subproject commit 84663e6da682a4edf6e6705b9e5018720220a318
+++ /dev/null
-*
-!target/*.jar
+++ /dev/null
-FROM openjdk:11-jre
-VOLUME /tmp
-COPY target/*.jar /opt/app.jar
-ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
-CMD []
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project
- xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.3.2.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
-
- <groupId>de.juplo.kafka.outbox</groupId>
- <artifactId>polling-outbox</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>polling-outbox</name>
- <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
-
- <properties>
- <java.version>11</java.version>
- <guava.version>30.0-jre</guava.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jdbc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-json</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- <dependency>
- <groupId>org.postgresql</groupId>
- <artifactId>postgresql</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.h2database</groupId>
- <artifactId>h2</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>0.33.0</version>
- <configuration>
- <images>
- <image>
- <name>%a:%l</name>
- </image>
- </images>
- </configuration>
- <executions>
- <execution>
- <id>build</id>
- <phase>package</phase>
- <goals>
- <goal>build</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
+++ /dev/null
-package de.juplo.kafka.outbox;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.scheduling.annotation.EnableScheduling;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
-@EnableScheduling
-public class Application
-{
- public static void main(String[] args) throws Exception
- {
- SpringApplication.run(Application.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.outbox;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("de.juplo.kafka.outbox")
-@Getter
-@Setter
-public class ApplicationProperties
-{
- String bootstrapServers = "localhost:9092";
- String topic = "outbox";
-}
+++ /dev/null
-package de.juplo.kafka.outbox;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Value;
-
-
-@Data
-@Value
-@Builder
-public class OutboxItem
-{
- private final Long sequenceNumber;
- private final String key;
- private final String value;
-}
+++ /dev/null
-package de.juplo.kafka.outbox;
-
-import com.google.common.primitives.Longs;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-
-
-@Component
-public class OutboxProducer
-{
- final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
-
-
- private final OutboxRepository repository;
- private final KafkaProducer<String, String> producer;
- private final String topic;
-
- private long sequenceNumber = 0l;
-
- public OutboxProducer(
- ApplicationProperties properties,
- OutboxRepository repository)
- {
- this.repository = repository;
-
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.bootstrapServers);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- this.producer = new KafkaProducer<>(props);
- this.topic = properties.topic;
- }
-
- @Scheduled(fixedDelay = 500)
- public void poll()
- {
- List<OutboxItem> items;
- do
- {
- items = repository.fetch(sequenceNumber);
- LOG.debug("Polled {} new items", items.size());
- for (OutboxItem item : items)
- send(item);
- }
- while (items.size() > 0);
- }
-
- void send(OutboxItem item)
- {
- final ProducerRecord<String, String> record =
- new ProducerRecord<>(topic, item.getKey(), item.getValue());
-
- sequenceNumber = item.getSequenceNumber();
- record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
-
- producer.send(record, (metadata, e) ->
- {
- if (metadata != null)
- {
- int deleted = repository.delete(item.getSequenceNumber());
- LOG.info(
- "{}/{}:{} - {}:{}={} - deleted: {}",
- metadata.topic(),
- metadata.partition(),
- metadata.offset(),
- item.getSequenceNumber(),
- record.key(),
- record.value(),
- deleted);
- }
- else
- {
- // HANDLE ERROR
- LOG.error(
- "{}/{} - {}:{}={} -> ",
- record.topic(),
- record.partition(),
- item.getSequenceNumber(),
- record.key(),
- record.value(),
- e);
- }
- });
- }
-
-
- @PreDestroy
- public void close()
- {
- producer.close(Duration.ofSeconds(5));
- }
-}
+++ /dev/null
-package de.juplo.kafka.outbox;
-
-import lombok.AllArgsConstructor;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
-import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.sql.Timestamp;
-import java.time.ZonedDateTime;
-import java.util.List;
-
-
-@Repository
-@AllArgsConstructor
-public class OutboxRepository
-{
- private static final String SQL_QUERY =
- "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
- private static final String SQL_UPDATE =
- "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
- private static final String SQL_DELETE =
- "DELETE FROM outbox WHERE id = :id";
-
- private final NamedParameterJdbcTemplate jdbcTemplate;
-
-
- public void save(String key, String value, ZonedDateTime issued)
- {
- MapSqlParameterSource parameters = new MapSqlParameterSource();
- parameters.addValue("key", key);
- parameters.addValue("value", value);
- parameters.addValue("issued", Timestamp.from(issued.toInstant()));
- jdbcTemplate.update(SQL_UPDATE, parameters);
- }
-
- public int delete(Long id)
- {
- MapSqlParameterSource parameters = new MapSqlParameterSource();
- parameters.addValue("id", id);
- return jdbcTemplate.update(SQL_DELETE, parameters);
- }
-
- public List<OutboxItem> fetch(Long sequenceNumber)
- {
- MapSqlParameterSource parameters = new MapSqlParameterSource();
- parameters.addValue("sequenceNumber", sequenceNumber);
- return
- jdbcTemplate.query(
- SQL_QUERY,
- parameters,
- (resultSet, rowNumber) ->
- {
- return
- OutboxItem
- .builder()
- .sequenceNumber(resultSet.getLong(1))
- .key(resultSet.getString(2))
- .value(resultSet.getString(3))
- .build();
- });
- }
-}
+++ /dev/null
-management:
- endpoints:
- web:
- exposure:
- include: "*"
-
-spring:
- flyway:
- locations: classpath:db/migration/h2
-
-logging:
- level:
- de:
- juplo:
- kafka:
- outbox: DEBUG
-
----
-
-spring:
- profiles: prod
-
- datasource:
- url: jdbc:postgresql://postgres:5432/outbox
- username: outbox
- password: outbox
- flyway:
- locations: classpath:db/migration/postgres
-
-de:
- juplo:
- kafka:
- outbox:
- bootstrap-servers: kafka:9093
-
----
-
-spring:
- profiles: dev
-
- datasource:
- url: jdbc:postgresql://localhost:5432/outbox
- username: outbox
- password: outbox
- flyway:
- locations: classpath:db/migration/postgres
-
-de:
- juplo:
- kafka:
- outbox:
- bootstrap-servers: localhost:9092
-
+++ /dev/null
-package de.juplo.kafka.outbox;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest
-public class ApplicationTests
-{
- @Test
- public void contextLoads()
- {
- }
-}
<description>Simple example-implementation of the Polling-Outbox-Pattern</description>
<modules>
+ <module>postage</module>
<module>jdbc</module>
- <module>outbox</module>
+ <module>delivery</module>
</modules>
</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.3.2.RELEASE</version>
+ <relativePath/> <!-- lookup parent from repository -->
+ </parent>
+
+ <groupId>de.juplo.kafka.outbox</groupId>
+ <artifactId>outbox-postage</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>outbox-postage</name>
+ <description>Simple example-implementation of the Polling-Outbox-Pattern</description>
+
+ <properties>
+ <java.version>11</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.flywaydb</groupId>
+ <artifactId>flyway-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
--- /dev/null
+package de.juplo.kafka.outbox.postage;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.springframework.context.ApplicationEvent;
+
+import java.time.ZonedDateTime;
+
+
+@ToString
+@EqualsAndHashCode
+public class OutboxEvent extends ApplicationEvent
+{
+ @Getter
+ private final String key;
+ @Getter
+ private final Object value;
+ @Getter
+ private final ZonedDateTime time;
+
+
+ public OutboxEvent(Object source, String key, Object value, ZonedDateTime time)
+ {
+ super(source);
+ this.key = key;
+ this.value = value;
+ this.time = time;
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.postage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+
+@Component
+@AllArgsConstructor
+public class OutboxListener
+{
+ private final OutboxRepository repository;
+ private final ObjectMapper mapper;
+
+
+ @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+ public void onUserEvent(OutboxEvent event)
+ {
+ try
+ {
+ repository.save(
+ event.getKey(),
+ mapper.writeValueAsString(event.getValue()),
+ event.getTime());
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.postage;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+ private static final String SQL_UPDATE =
+ "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+
+ private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+ public void save(String key, String value, ZonedDateTime issued)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("key", key);
+ parameters.addValue("value", value);
+ parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+ jdbcTemplate.update(SQL_UPDATE, parameters);
+ }
+}