From fbd0ca0df5004d13a5e93cdb8373bafc60440c8b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 1 Nov 2020 12:58:20 +0100 Subject: [PATCH] Moved postage of messages into a reusable standalone implementation * Renamed maven-module outbox into delivery * Renamed app polling-outbox into outbox-polling-delivery * Added new artifact outbox-postage as maven-module postage * Added fully qualified names for the docker-images * Adapted docker-compose.yml and README.sh * TODO: Move flyway-scriptes for outbox into module postage --- README.sh | 6 +- {outbox => delivery}/.dockerignore | 0 {outbox => delivery}/Dockerfile | 0 {outbox => delivery}/pom.xml | 4 +- .../kafka/outbox/delivery}/Application.java | 2 +- .../delivery}/ApplicationProperties.java | 2 +- .../kafka/outbox/delivery}/OutboxItem.java | 2 +- .../outbox/delivery}/OutboxProducer.java | 2 +- .../outbox/delivery}/OutboxRepository.java | 2 +- .../src/main/resources/application.yml | 0 .../outbox/delivery}/ApplicationTests.java | 2 +- docker-compose.yml | 2 +- jdbc | 2 +- pom.xml | 3 +- postage/pom.xml | 58 +++++++++++++++++++ .../kafka/outbox/postage/OutboxEvent.java | 30 ++++++++++ .../kafka/outbox/postage/OutboxListener.java | 34 +++++++++++ .../outbox/postage/OutboxRepository.java | 30 ++++++++++ 18 files changed, 167 insertions(+), 14 deletions(-) rename {outbox => delivery}/.dockerignore (100%) rename {outbox => delivery}/Dockerfile (100%) rename {outbox => delivery}/pom.xml (96%) rename {outbox/src/main/java/de/juplo/kafka/outbox => delivery/src/main/java/de/juplo/kafka/outbox/delivery}/Application.java (92%) rename {outbox/src/main/java/de/juplo/kafka/outbox => delivery/src/main/java/de/juplo/kafka/outbox/delivery}/ApplicationProperties.java (88%) rename {outbox/src/main/java/de/juplo/kafka/outbox => delivery/src/main/java/de/juplo/kafka/outbox/delivery}/OutboxItem.java (84%) rename {outbox/src/main/java/de/juplo/kafka/outbox => delivery/src/main/java/de/juplo/kafka/outbox/delivery}/OutboxProducer.java (98%) rename {outbox/src/main/java/de/juplo/kafka/outbox => delivery/src/main/java/de/juplo/kafka/outbox/delivery}/OutboxRepository.java (97%) rename {outbox => delivery}/src/main/resources/application.yml (100%) rename {outbox/src/test/java/de/juplo/kafka/outbox => delivery/src/test/java/de/juplo/kafka/outbox/delivery}/ApplicationTests.java (88%) create mode 100644 postage/pom.xml create mode 100644 postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java create mode 100644 postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java create mode 100644 postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java diff --git a/README.sh b/README.sh index c9f133c..b38ba61 100755 --- a/README.sh +++ b/README.sh @@ -5,7 +5,7 @@ then docker-compose down -v mvn clean docker image rm juplo/data-jdbc:polling-outbox-2-SNAPSHOT - docker image rm juplo/polling-outbox:polling-outbox-2-SNAPSHOT + docker image rm juplo/outbox-delivery:polling-outbox-2-SNAPSHOT exit fi @@ -13,7 +13,7 @@ docker-compose up -d zookeeper kafka if [[ $(docker image ls -q juplo/data-jdbc:polling-outbox-2-SNAPSHOT) == "" || - $(docker image ls -q juplo/polling-outbox:polling-outbox-2-SNAPSHOT) == "" || + $(docker image ls -q juplo/outbox-delivery:polling-outbox-2-SNAPSHOT) == "" || "$1" = "build" ]] then @@ -21,7 +21,7 @@ then else echo "Using image existing images:" docker image ls juplo/data-jdbc:polling-outbox-2-SNAPSHOT - docker image ls juplo/polling-outbox:polling-outbox-2-SNAPSHOT + docker image ls juplo/outbox-delivery:polling-outbox-2-SNAPSHOT fi while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; diff --git a/outbox/.dockerignore b/delivery/.dockerignore similarity index 100% rename from outbox/.dockerignore rename to delivery/.dockerignore diff --git a/outbox/Dockerfile b/delivery/Dockerfile similarity index 100% rename from outbox/Dockerfile rename to delivery/Dockerfile diff --git a/outbox/pom.xml b/delivery/pom.xml similarity index 96% rename from outbox/pom.xml rename to delivery/pom.xml index 5bea7c9..faef6a9 100644 --- a/outbox/pom.xml +++ b/delivery/pom.xml @@ -14,9 +14,9 @@ de.juplo.kafka.outbox - polling-outbox + outbox-delivery polling-outbox-2-SNAPSHOT - Outbox (Polling) + Outbox-Delivery (Polling) Simple example-implementation of the Outbox-Pattern (polling variant) diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java similarity index 92% rename from outbox/src/main/java/de/juplo/kafka/outbox/Application.java rename to delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java index 678a48a..6abd181 100644 --- a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/Application.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.outbox; +package de.juplo.kafka.outbox.delivery; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java similarity index 88% rename from outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java rename to delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java index 1a5dca6..4e36aa4 100644 --- a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/ApplicationProperties.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.outbox; +package de.juplo.kafka.outbox.delivery; import lombok.Getter; import lombok.Setter; diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java similarity index 84% rename from outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java rename to delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java index 99deafa..e48ac8e 100644 --- a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxItem.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.outbox; +package de.juplo.kafka.outbox.delivery; import lombok.Builder; import lombok.Data; diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java similarity index 98% rename from outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java rename to delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 30bef96..c08cae7 100644 --- a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.outbox; +package de.juplo.kafka.outbox.delivery; import com.google.common.primitives.Longs; import org.apache.kafka.common.serialization.StringSerializer; diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java similarity index 97% rename from outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java rename to delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java index 03a68ef..abf2d1d 100644 --- a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxRepository.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.outbox; +package de.juplo.kafka.outbox.delivery; import lombok.AllArgsConstructor; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; diff --git a/outbox/src/main/resources/application.yml b/delivery/src/main/resources/application.yml similarity index 100% rename from outbox/src/main/resources/application.yml rename to delivery/src/main/resources/application.yml diff --git a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java b/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java similarity index 88% rename from outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java rename to delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java index cb5bc43..b8f1834 100644 --- a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java +++ b/delivery/src/test/java/de/juplo/kafka/outbox/delivery/ApplicationTests.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.outbox; +package de.juplo.kafka.outbox.delivery; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/docker-compose.yml b/docker-compose.yml index 3600d5f..ec0a963 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,7 +39,7 @@ services: - postgres outbox: - image: juplo/polling-outbox:polling-outbox-2-SNAPSHOT + image: juplo/outbox-delivery:polling-outbox-2-SNAPSHOT environment: spring.profiles.active: prod depends_on: diff --git a/jdbc b/jdbc index d2fbbf0..bd0d272 160000 --- a/jdbc +++ b/jdbc @@ -1 +1 @@ -Subproject commit d2fbbf029e151e37fcd48cbbdb90c3c14a48aa8d +Subproject commit bd0d27231000709358794cf034ab6c2b0b3db8ab diff --git a/pom.xml b/pom.xml index 8825add..7f98c54 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,9 @@ Simple example-implementation of the Polling-Outbox-Pattern + postage jdbc - outbox + delivery diff --git a/postage/pom.xml b/postage/pom.xml new file mode 100644 index 0000000..9892acd --- /dev/null +++ b/postage/pom.xml @@ -0,0 +1,58 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.3.2.RELEASE + + + + de.juplo.kafka.outbox + outbox-postage + polling-outbox-2-SNAPSHOT + outbox-postage + Simple example-implementation of the Polling-Outbox-Pattern + + + 11 + + + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + org.springframework.boot + spring-boot-starter-json + + + org.projectlombok + lombok + + + org.flywaydb + flyway-core + + + com.h2database + h2 + + + org.postgresql + postgresql + + + org.springframework.boot + spring-boot-starter-test + test + + + + diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java new file mode 100644 index 0000000..6c87b12 --- /dev/null +++ b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxEvent.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.outbox.postage; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.springframework.context.ApplicationEvent; + +import java.time.ZonedDateTime; + + +@ToString +@EqualsAndHashCode +public class OutboxEvent extends ApplicationEvent +{ + @Getter + private final String key; + @Getter + private final Object value; + @Getter + private final ZonedDateTime time; + + + public OutboxEvent(Object source, String key, Object value, ZonedDateTime time) + { + super(source); + this.key = key; + this.value = value; + this.time = time; + } +} diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java new file mode 100644 index 0000000..abb350a --- /dev/null +++ b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxListener.java @@ -0,0 +1,34 @@ +package de.juplo.kafka.outbox.postage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + + +@Component +@AllArgsConstructor +public class OutboxListener +{ + private final OutboxRepository repository; + private final ObjectMapper mapper; + + + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void onUserEvent(OutboxEvent event) + { + try + { + repository.save( + event.getKey(), + mapper.writeValueAsString(event.getValue()), + event.getTime()); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + } +} diff --git a/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java new file mode 100644 index 0000000..50fc301 --- /dev/null +++ b/postage/src/main/java/de/juplo/kafka/outbox/postage/OutboxRepository.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.outbox.postage; + +import lombok.AllArgsConstructor; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; + + +@Repository +@AllArgsConstructor +public class OutboxRepository +{ + private static final String SQL_UPDATE = + "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public void save(String key, String value, ZonedDateTime issued) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("key", key); + parameters.addValue("value", value); + parameters.addValue("issued", Timestamp.from(issued.toInstant())); + jdbcTemplate.update(SQL_UPDATE, parameters); + } +} -- 2.20.1