From: Kai Moritz Date: Sun, 1 Nov 2020 11:58:20 +0000 (+0100) Subject: Moved postage of messages into a reusable standalone implementation X-Git-Tag: part-1~9 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=6a518e3d0ab8f8d22009d2ec2c0f8a58ed5fefd5;p=demos%2Fkafka%2Foutbox Moved postage of messages into a reusable standalone implementation * Renamed maven-module outbox into delivery * Renamed app polling-outbox into outbox-polling-delivery * Added new artifact outbox-postage as maven-module postage * Added fully qualified names for the docker-images * TODO: Move flyway-scriptes for outbox into module postage --- diff --git a/delivery/.dockerignore b/delivery/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/delivery/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/delivery/Dockerfile b/delivery/Dockerfile new file mode 100644 index 0000000..16ee25e --- /dev/null +++ b/delivery/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/delivery/pom.xml b/delivery/pom.xml new file mode 100644 index 0000000..975fbb5 --- /dev/null +++ b/delivery/pom.xml @@ -0,0 +1,95 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.3.2.RELEASE + + + + de.juplo.kafka.outbox + outbox-delivery + 0.0.1-SNAPSHOT + Polling Outbox-Delivery + Simple example-implementation of the Polling-Outbox-Pattern + + + 11 + 30.0-jre + + + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + org.springframework.boot + spring-boot-starter-json + + + org.apache.kafka + kafka-clients + + + org.projectlombok + lombok + + + com.google.guava + guava + ${guava.version} + + + org.postgresql + postgresql + + + org.springframework.boot + spring-boot-starter-test + test + + + com.h2database + h2 + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/outbox:polling + + + + + + build + package + + build + + + + + + + + diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java new file mode 100644 index 0000000..6abd181 --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.outbox.delivery; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.scheduling.annotation.EnableScheduling; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +@EnableScheduling +public class Application +{ + public static void main(String[] args) throws Exception + { + SpringApplication.run(Application.class, args); + } +} diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java new file mode 100644 index 0000000..4e36aa4 --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.outbox.delivery; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("de.juplo.kafka.outbox") +@Getter +@Setter +public class ApplicationProperties +{ + String bootstrapServers = "localhost:9092"; + String topic = "outbox"; +} diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java new file mode 100644 index 0000000..e48ac8e --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.outbox.delivery; + +import lombok.Builder; +import lombok.Data; +import lombok.Value; + + +@Data +@Value +@Builder +public class OutboxItem +{ + private final Long sequenceNumber; + private final String key; + private final String value; +} diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java new file mode 100644 index 0000000..c08cae7 --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -0,0 +1,105 @@ +package de.juplo.kafka.outbox.delivery; + +import com.google.common.primitives.Longs; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + + +@Component +public class OutboxProducer +{ + final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); + + + private final OutboxRepository repository; + private final KafkaProducer producer; + private final String topic; + + private long sequenceNumber = 0l; + + public OutboxProducer( + ApplicationProperties properties, + OutboxRepository repository) + { + this.repository = repository; + + Properties props = new Properties(); + props.put("bootstrap.servers", properties.bootstrapServers); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + this.topic = properties.topic; + } + + @Scheduled(fixedDelay = 500) + public void poll() + { + List items; + do + { + items = repository.fetch(sequenceNumber); + LOG.debug("Polled {} new items", items.size()); + for (OutboxItem item : items) + send(item); + } + while (items.size() > 0); + } + + void send(OutboxItem item) + { + final ProducerRecord record = + new ProducerRecord<>(topic, item.getKey(), item.getValue()); + + sequenceNumber = item.getSequenceNumber(); + record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber)); + + producer.send(record, (metadata, e) -> + { + if (metadata != null) + { + int deleted = repository.delete(item.getSequenceNumber()); + LOG.info( + "{}/{}:{} - {}:{}={} - deleted: {}", + metadata.topic(), + metadata.partition(), + metadata.offset(), + item.getSequenceNumber(), + record.key(), + record.value(), + deleted); + } + else + { + // HANDLE ERROR + LOG.error( + "{}/{} - {}:{}={} -> ", + record.topic(), + record.partition(), + item.getSequenceNumber(), + record.key(), + record.value(), + e); + } + }); + } + + + @PreDestroy + public void close() + { + producer.close(Duration.ofSeconds(5)); + } +} diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java new file mode 100644 index 0000000..abf2d1d --- /dev/null +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java @@ -0,0 +1,62 @@ +package de.juplo.kafka.outbox.delivery; + +import lombok.AllArgsConstructor; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; +import java.util.List; + + +@Repository +@AllArgsConstructor +public class OutboxRepository +{ + private static final String SQL_QUERY = + "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC"; + private static final String SQL_UPDATE = + "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; + private static final String SQL_DELETE = + "DELETE FROM outbox WHERE id = :id"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public void save(String key, String value, ZonedDateTime issued) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("key", key); + parameters.addValue("value", value); + parameters.addValue("issued", Timestamp.from(issued.toInstant())); + jdbcTemplate.update(SQL_UPDATE, parameters); + } + + public int delete(Long id) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("id", id); + return jdbcTemplate.update(SQL_DELETE, parameters); + } + + public List fetch(Long sequenceNumber) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("sequenceNumber", sequenceNumber); + return + jdbcTemplate.query( + SQL_QUERY, + parameters, + (resultSet, rowNumber) -> + { + return + OutboxItem + .builder() + .sequenceNumber(resultSet.getLong(1)) + .key(resultSet.getString(2)) + .value(resultSet.getString(3)) + .build(); + }); + } +} diff --git a/delivery/src/main/resources/application.yml b/delivery/src/main/resources/application.yml new file mode 100644 index 0000000..2a8502a --- /dev/null +++ b/delivery/src/main/resources/application.yml @@ -0,0 +1,53 @@ +management: + endpoints: + web: + exposure: + include: "*" + +spring: + flyway: + locations: classpath:db/migration/h2 + +logging: + level: + de: + juplo: + kafka: + outbox: DEBUG + +--- + +spring: + profiles: prod + + datasource: + url: jdbc:postgresql://postgres:5432/outbox + username: outbox + password: outbox + flyway: + locations: classpath:db/migration/postgres + +de: + juplo: + kafka: + outbox: + bootstrap-servers: kafka:9093 + +--- + +spring: + profiles: dev + + datasource: + url: jdbc:postgresql://localhost:5432/outbox + username: outbox + password: outbox + flyway: + locations: classpath:db/migration/postgres + +de: + juplo: + kafka: + outbox: + bootstrap-servers: localhost:9092 + diff --git a/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java b/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java new file mode 100644 index 0000000..b8f1834 --- /dev/null +++ b/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.outbox.delivery; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class ApplicationTests +{ + @Test + public void contextLoads() + { + } +} diff --git a/docker-compose.yml b/docker-compose.yml index 0125f95..2cd2608 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,16 +23,16 @@ services: - zookeeper jdbc: - image: jdbc:latest + image: juplo/jdbc:outbox ports: - 8080:8080 environment: - spring.profiles.active: production + spring.profiles.active: prod depends_on: - postgres outbox: - image: polling-outbox:latest + image: juplo/outbox:polling environment: spring.profiles.active: prod depends_on: diff --git a/jdbc b/jdbc index 4d047ac..84663e6 160000 --- a/jdbc +++ b/jdbc @@ -1 +1 @@ -Subproject commit 4d047ac98c61563fbb9986b6ce1e4d6ead5a1e48 +Subproject commit 84663e6da682a4edf6e6705b9e5018720220a318 diff --git a/outbox/.dockerignore b/outbox/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/outbox/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/outbox/Dockerfile b/outbox/Dockerfile deleted file mode 100644 index 16ee25e..0000000 --- a/outbox/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM openjdk:11-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [] diff --git a/outbox/pom.xml b/outbox/pom.xml deleted file mode 100644 index e8b48b6..0000000 --- a/outbox/pom.xml +++ /dev/null @@ -1,95 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 2.3.2.RELEASE - - - - de.juplo.kafka.outbox - polling-outbox - 0.0.1-SNAPSHOT - polling-outbox - Simple example-implementation of the Polling-Outbox-Pattern - - - 11 - 30.0-jre - - - - - org.springframework.boot - spring-boot-starter-data-jdbc - - - org.springframework.boot - spring-boot-starter-json - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - - - com.google.guava - guava - ${guava.version} - - - org.postgresql - postgresql - - - org.springframework.boot - spring-boot-starter-test - test - - - com.h2database - h2 - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - - - %a:%l - - - - - - build - package - - build - - - - - - - - diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java deleted file mode 100644 index 678a48a..0000000 --- a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka.outbox; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.scheduling.annotation.EnableScheduling; - - -@SpringBootApplication -@EnableConfigurationProperties(ApplicationProperties.class) -@EnableScheduling -public class Application -{ - public static void main(String[] args) throws Exception - { - SpringApplication.run(Application.class, args); - } -} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java deleted file mode 100644 index 1a5dca6..0000000 --- a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.outbox; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("de.juplo.kafka.outbox") -@Getter -@Setter -public class ApplicationProperties -{ - String bootstrapServers = "localhost:9092"; - String topic = "outbox"; -} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java deleted file mode 100644 index 99deafa..0000000 --- a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.outbox; - -import lombok.Builder; -import lombok.Data; -import lombok.Value; - - -@Data -@Value -@Builder -public class OutboxItem -{ - private final Long sequenceNumber; - private final String key; - private final String value; -} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java deleted file mode 100644 index 30bef96..0000000 --- a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java +++ /dev/null @@ -1,105 +0,0 @@ -package de.juplo.kafka.outbox; - -import com.google.common.primitives.Longs; -import org.apache.kafka.common.serialization.StringSerializer; - -import java.time.Duration; -import java.util.List; -import java.util.Properties; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import javax.annotation.PreDestroy; - - -@Component -public class OutboxProducer -{ - final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); - - - private final OutboxRepository repository; - private final KafkaProducer producer; - private final String topic; - - private long sequenceNumber = 0l; - - public OutboxProducer( - ApplicationProperties properties, - OutboxRepository repository) - { - this.repository = repository; - - Properties props = new Properties(); - props.put("bootstrap.servers", properties.bootstrapServers); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); - this.topic = properties.topic; - } - - @Scheduled(fixedDelay = 500) - public void poll() - { - List items; - do - { - items = repository.fetch(sequenceNumber); - LOG.debug("Polled {} new items", items.size()); - for (OutboxItem item : items) - send(item); - } - while (items.size() > 0); - } - - void send(OutboxItem item) - { - final ProducerRecord record = - new ProducerRecord<>(topic, item.getKey(), item.getValue()); - - sequenceNumber = item.getSequenceNumber(); - record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber)); - - producer.send(record, (metadata, e) -> - { - if (metadata != null) - { - int deleted = repository.delete(item.getSequenceNumber()); - LOG.info( - "{}/{}:{} - {}:{}={} - deleted: {}", - metadata.topic(), - metadata.partition(), - metadata.offset(), - item.getSequenceNumber(), - record.key(), - record.value(), - deleted); - } - else - { - // HANDLE ERROR - LOG.error( - "{}/{} - {}:{}={} -> ", - record.topic(), - record.partition(), - item.getSequenceNumber(), - record.key(), - record.value(), - e); - } - }); - } - - - @PreDestroy - public void close() - { - producer.close(Duration.ofSeconds(5)); - } -} diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java deleted file mode 100644 index 03a68ef..0000000 --- a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java +++ /dev/null @@ -1,62 +0,0 @@ -package de.juplo.kafka.outbox; - -import lombok.AllArgsConstructor; -import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; -import org.springframework.stereotype.Repository; - -import java.sql.Timestamp; -import java.time.ZonedDateTime; -import java.util.List; - - -@Repository -@AllArgsConstructor -public class OutboxRepository -{ - private static final String SQL_QUERY = - "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC"; - private static final String SQL_UPDATE = - "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; - private static final String SQL_DELETE = - "DELETE FROM outbox WHERE id = :id"; - - private final NamedParameterJdbcTemplate jdbcTemplate; - - - public void save(String key, String value, ZonedDateTime issued) - { - MapSqlParameterSource parameters = new MapSqlParameterSource(); - parameters.addValue("key", key); - parameters.addValue("value", value); - parameters.addValue("issued", Timestamp.from(issued.toInstant())); - jdbcTemplate.update(SQL_UPDATE, parameters); - } - - public int delete(Long id) - { - MapSqlParameterSource parameters = new MapSqlParameterSource(); - parameters.addValue("id", id); - return jdbcTemplate.update(SQL_DELETE, parameters); - } - - public List fetch(Long sequenceNumber) - { - MapSqlParameterSource parameters = new MapSqlParameterSource(); - parameters.addValue("sequenceNumber", sequenceNumber); - return - jdbcTemplate.query( - SQL_QUERY, - parameters, - (resultSet, rowNumber) -> - { - return - OutboxItem - .builder() - .sequenceNumber(resultSet.getLong(1)) - .key(resultSet.getString(2)) - .value(resultSet.getString(3)) - .build(); - }); - } -} diff --git a/outbox/src/main/resources/application.yml b/outbox/src/main/resources/application.yml deleted file mode 100644 index 2a8502a..0000000 --- a/outbox/src/main/resources/application.yml +++ /dev/null @@ -1,53 +0,0 @@ -management: - endpoints: - web: - exposure: - include: "*" - -spring: - flyway: - locations: classpath:db/migration/h2 - -logging: - level: - de: - juplo: - kafka: - outbox: DEBUG - ---- - -spring: - profiles: prod - - datasource: - url: jdbc:postgresql://postgres:5432/outbox - username: outbox - password: outbox - flyway: - locations: classpath:db/migration/postgres - -de: - juplo: - kafka: - outbox: - bootstrap-servers: kafka:9093 - ---- - -spring: - profiles: dev - - datasource: - url: jdbc:postgresql://localhost:5432/outbox - username: outbox - password: outbox - flyway: - locations: classpath:db/migration/postgres - -de: - juplo: - kafka: - outbox: - bootstrap-servers: localhost:9092 - diff --git a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java deleted file mode 100644 index cb5bc43..0000000 --- a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.outbox; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -@RunWith(SpringRunner.class) -@SpringBootTest -public class ApplicationTests -{ - @Test - public void contextLoads() - { - } -} diff --git a/pom.xml b/pom.xml index a270735..f54749e 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,9 @@ Simple example-implementation of the Polling-Outbox-Pattern + postage jdbc - outbox + delivery diff --git a/postage/pom.xml b/postage/pom.xml new file mode 100644 index 0000000..1a3ac86 --- /dev/null +++ b/postage/pom.xml @@ -0,0 +1,58 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.3.2.RELEASE + + + + de.juplo.kafka.outbox + outbox-postage + 0.0.1-SNAPSHOT + outbox-postage + Simple example-implementation of the Polling-Outbox-Pattern + + + 11 + + + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + org.springframework.boot + spring-boot-starter-json + + + org.projectlombok + lombok + + + org.flywaydb + flyway-core + + + com.h2database + h2 + + + org.postgresql + postgresql + + + org.springframework.boot + spring-boot-starter-test + test + + + + diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java new file mode 100644 index 0000000..6c87b12 --- /dev/null +++ b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.outbox.postage; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.springframework.context.ApplicationEvent; + +import java.time.ZonedDateTime; + + +@ToString +@EqualsAndHashCode +public class OutboxEvent extends ApplicationEvent +{ + @Getter + private final String key; + @Getter + private final Object value; + @Getter + private final ZonedDateTime time; + + + public OutboxEvent(Object source, String key, Object value, ZonedDateTime time) + { + super(source); + this.key = key; + this.value = value; + this.time = time; + } +} diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java new file mode 100644 index 0000000..abb350a --- /dev/null +++ b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java @@ -0,0 +1,34 @@ +package de.juplo.kafka.outbox.postage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + + +@Component +@AllArgsConstructor +public class OutboxListener +{ + private final OutboxRepository repository; + private final ObjectMapper mapper; + + + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void onUserEvent(OutboxEvent event) + { + try + { + repository.save( + event.getKey(), + mapper.writeValueAsString(event.getValue()), + event.getTime()); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + } +} diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java new file mode 100644 index 0000000..50fc301 --- /dev/null +++ b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.outbox.postage; + +import lombok.AllArgsConstructor; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; + + +@Repository +@AllArgsConstructor +public class OutboxRepository +{ + private static final String SQL_UPDATE = + "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public void save(String key, String value, ZonedDateTime issued) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("key", key); + parameters.addValue("value", value); + parameters.addValue("issued", Timestamp.from(issued.toInstant())); + jdbcTemplate.update(SQL_UPDATE, parameters); + } +}