From 136347c46acdb74268e882919f3b811a64b58f90 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 25 Oct 2020 16:45:39 +0100 Subject: [PATCH] Published events are transactionally captured in an outbox-table * The outbox-implementation it kept separated from the application: the application sends a normal ApplicationEvent - everything else is implemented in a separate package, that could be packaged and distributed separatedly * Created a new table outbox * Added the event-type OutboxEvent, which is captured in the outbox * UserEvent is now a subclass of OutboxEvent * Added an OutboxListener that transactionally captures the events --- README.sh | 1 + .../de/juplo/boot/data/jdbc/Application.java | 8 ++++- .../juplo/boot/data/jdbc/UserController.java | 3 +- .../de/juplo/boot/data/jdbc/UserEvent.java | 20 ++--------- .../boot/data/jdbc/UserEventListener.java | 2 +- .../de/juplo/kafka/outbox/OutboxEvent.java | 30 ++++++++++++++++ .../de/juplo/kafka/outbox/OutboxListener.java | 34 +++++++++++++++++++ .../juplo/kafka/outbox/OutboxRepository.java | 30 ++++++++++++++++ .../db/migration/h2/V2__Table_outbox.sql | 1 + .../migration/postgres/V2__Table_outbox.sql | 3 ++ 10 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/outbox/OutboxEvent.java create mode 100644 src/main/java/de/juplo/kafka/outbox/OutboxListener.java create mode 100644 src/main/java/de/juplo/kafka/outbox/OutboxRepository.java create mode 100644 src/main/resources/db/migration/h2/V2__Table_outbox.sql create mode 100644 src/main/resources/db/migration/postgres/V2__Table_outbox.sql diff --git a/README.sh b/README.sh index 23bff3a..ab544c9 100755 --- a/README.sh +++ b/README.sh @@ -45,4 +45,5 @@ do sleep 1 done; +docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0 outbox docker-compose stop diff --git a/src/main/java/de/juplo/boot/data/jdbc/Application.java b/src/main/java/de/juplo/boot/data/jdbc/Application.java index 956fd6f..732c7f4 100644 --- a/src/main/java/de/juplo/boot/data/jdbc/Application.java +++ b/src/main/java/de/juplo/boot/data/jdbc/Application.java @@ -9,7 +9,13 @@ import org.springframework.context.annotation.Bean; import java.time.Clock; -@SpringBootApplication +import java.time.Clock; + + +@SpringBootApplication(scanBasePackages = { + "de.juplo.boot.data.jdbc", + "de.juplo.kafka.outbox" + }) public class Application { private final static Logger LOG = LoggerFactory.getLogger(Application.class); diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserController.java b/src/main/java/de/juplo/boot/data/jdbc/UserController.java index ac5ddbc..b98110e 100644 --- a/src/main/java/de/juplo/boot/data/jdbc/UserController.java +++ b/src/main/java/de/juplo/boot/data/jdbc/UserController.java @@ -1,5 +1,6 @@ package de.juplo.boot.data.jdbc; +import de.juplo.kafka.outbox.OutboxEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; @@ -88,7 +89,7 @@ public class UserController { repository.delete(user); publisher.publishEvent( - new UserEvent( + new OutboxEvent( this, user.getUsername(), DELETED, diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java b/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java index 3693ebf..306a541 100644 --- a/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java +++ b/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java @@ -1,28 +1,14 @@ package de.juplo.boot.data.jdbc; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; -import org.springframework.context.ApplicationEvent; +import de.juplo.kafka.outbox.OutboxEvent; import java.time.ZonedDateTime; -@Getter -@EqualsAndHashCode -@ToString -public class UserEvent extends ApplicationEvent +public class UserEvent extends OutboxEvent { - private final String key; - private final UserStatus status; - private final ZonedDateTime time; - - public UserEvent(Object source, String key, UserStatus status, ZonedDateTime time) { - super(source); - this.key = key; - this.status = status; - this.time = time; + super(source, key, status, time); } } diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java b/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java index a9a2db2..97b751d 100644 --- a/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java +++ b/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java @@ -14,6 +14,6 @@ public class UserEventListener @TransactionalEventListener public void onUserEvent(UserEvent event) { - LOG.info("{}: {} - {}", event.getTime(), event.getStatus(), event.getKey()); + LOG.info("{}: {} - {}", event.getTime(), event.getValue(), event.getKey()); } } diff --git a/src/main/java/de/juplo/kafka/outbox/OutboxEvent.java b/src/main/java/de/juplo/kafka/outbox/OutboxEvent.java new file mode 100644 index 0000000..4ecacdb --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/OutboxEvent.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.outbox; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.springframework.context.ApplicationEvent; + +import java.time.ZonedDateTime; + + +@ToString +@EqualsAndHashCode +public class OutboxEvent extends ApplicationEvent +{ + @Getter + private final String key; + @Getter + private final Object value; + @Getter + private final ZonedDateTime time; + + + public OutboxEvent(Object source, String key, Object value, ZonedDateTime time) + { + super(source); + this.key = key; + this.value = value; + this.time = time; + } +} diff --git a/src/main/java/de/juplo/kafka/outbox/OutboxListener.java b/src/main/java/de/juplo/kafka/outbox/OutboxListener.java new file mode 100644 index 0000000..fa53469 --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/OutboxListener.java @@ -0,0 +1,34 @@ +package de.juplo.kafka.outbox; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + + +@Component +@AllArgsConstructor +public class OutboxListener +{ + private final OutboxRepository repository; + private final ObjectMapper mapper; + + + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) + public void onUserEvent(OutboxEvent event) + { + try + { + repository.save( + event.getKey(), + mapper.writeValueAsString(event.getValue()), + event.getTime()); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java new file mode 100644 index 0000000..006bd03 --- /dev/null +++ b/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.outbox; + +import lombok.AllArgsConstructor; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; + + +@Repository +@AllArgsConstructor +public class OutboxRepository +{ + private static final String SQL_UPDATE = + "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public void save(String key, String value, ZonedDateTime issued) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("key", key); + parameters.addValue("value", value); + parameters.addValue("issued", Timestamp.from(issued.toInstant())); + jdbcTemplate.update(SQL_UPDATE, parameters); + } +} diff --git a/src/main/resources/db/migration/h2/V2__Table_outbox.sql b/src/main/resources/db/migration/h2/V2__Table_outbox.sql new file mode 100644 index 0000000..3227f14 --- /dev/null +++ b/src/main/resources/db/migration/h2/V2__Table_outbox.sql @@ -0,0 +1 @@ +CREATE TABLE outbox(id BIGINT PRIMARY KEY AUTO_INCREMENT, key VARCHAR(127), value varchar(1023), issued timestamp); diff --git a/src/main/resources/db/migration/postgres/V2__Table_outbox.sql b/src/main/resources/db/migration/postgres/V2__Table_outbox.sql new file mode 100644 index 0000000..0b7fbfe --- /dev/null +++ b/src/main/resources/db/migration/postgres/V2__Table_outbox.sql @@ -0,0 +1,3 @@ +CREATE SEQUENCE outbox_id_seq; +CREATE TABLE outbox(id BIGINT PRIMARY KEY NOT NULL DEFAULT NEXTVAL('outbox_id_seq'), key VARCHAR(127), value varchar(1023), issued timestamp); +ALTER SEQUENCE outbox_id_seq OWNED BY outbox.id; -- 2.20.1