Published events are transactionally captured in an outbox-table
authorKai Moritz <kai@juplo.de>
Sun, 25 Oct 2020 15:45:39 +0000 (16:45 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 7 Feb 2021 14:12:28 +0000 (15:12 +0100)
* The outbox-implementation it kept separated from the application:
  the application sends a normal ApplicationEvent - everything else
  is implemented in a separate package, that could be packaged and
  distributed separatedly
* Created a new table outbox
* Added the event-type OutboxEvent, which is captured in the outbox
* UserEvent is now a subclass of OutboxEvent
* Added an OutboxListener that transactionally captures the events

README.sh
src/main/java/de/juplo/boot/data/jdbc/Application.java
src/main/java/de/juplo/boot/data/jdbc/UserController.java
src/main/java/de/juplo/boot/data/jdbc/UserEvent.java
src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java
src/main/java/de/juplo/kafka/outbox/OutboxEvent.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/OutboxListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/OutboxRepository.java [new file with mode: 0644]
src/main/resources/db/migration/h2/V2__Table_outbox.sql [new file with mode: 0644]
src/main/resources/db/migration/postgres/V2__Table_outbox.sql [new file with mode: 0644]

index 23bff3a..ab544c9 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -45,4 +45,5 @@ do
   sleep 1
 done;
 
+docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0  outbox
 docker-compose stop
index 956fd6f..732c7f4 100644 (file)
@@ -9,7 +9,13 @@ import org.springframework.context.annotation.Bean;
 import java.time.Clock;
 
 
-@SpringBootApplication
+import java.time.Clock;
+
+
+@SpringBootApplication(scanBasePackages = {
+  "de.juplo.boot.data.jdbc",
+  "de.juplo.kafka.outbox"
+  })
 public class Application {
 
     private final static Logger LOG = LoggerFactory.getLogger(Application.class);
index ac5ddbc..b98110e 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.boot.data.jdbc;
 
+import de.juplo.kafka.outbox.OutboxEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationEventPublisher;
@@ -88,7 +89,7 @@ public class UserController {
 
         repository.delete(user);
         publisher.publishEvent(
-            new UserEvent(
+            new OutboxEvent(
                 this,
                 user.getUsername(),
                 DELETED,
index 3693ebf..306a541 100644 (file)
@@ -1,28 +1,14 @@
 package de.juplo.boot.data.jdbc;
 
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.ToString;
-import org.springframework.context.ApplicationEvent;
+import de.juplo.kafka.outbox.OutboxEvent;
 
 import java.time.ZonedDateTime;
 
 
-@Getter
-@EqualsAndHashCode
-@ToString
-public class UserEvent extends ApplicationEvent
+public class UserEvent extends OutboxEvent
 {
-  private final String key;
-  private final UserStatus status;
-  private final ZonedDateTime time;
-
-
   public UserEvent(Object source, String key, UserStatus status, ZonedDateTime time)
   {
-    super(source);
-    this.key = key;
-    this.status = status;
-    this.time = time;
+    super(source, key, status, time);
   }
 }
index a9a2db2..97b751d 100644 (file)
@@ -14,6 +14,6 @@ public class UserEventListener
     @TransactionalEventListener
     public void onUserEvent(UserEvent event)
     {
-        LOG.info("{}: {} - {}", event.getTime(), event.getStatus(), event.getKey());
+        LOG.info("{}: {} - {}", event.getTime(), event.getValue(), event.getKey());
     }
 }
diff --git a/src/main/java/de/juplo/kafka/outbox/OutboxEvent.java b/src/main/java/de/juplo/kafka/outbox/OutboxEvent.java
new file mode 100644 (file)
index 0000000..4ecacdb
--- /dev/null
@@ -0,0 +1,30 @@
+package de.juplo.kafka.outbox;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.springframework.context.ApplicationEvent;
+
+import java.time.ZonedDateTime;
+
+
+@ToString
+@EqualsAndHashCode
+public class OutboxEvent extends ApplicationEvent
+{
+  @Getter
+  private final String key;
+  @Getter
+  private final Object value;
+  @Getter
+  private final ZonedDateTime time;
+
+
+  public OutboxEvent(Object source, String key, Object value, ZonedDateTime time)
+  {
+    super(source);
+    this.key = key;
+    this.value = value;
+    this.time = time;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/OutboxListener.java b/src/main/java/de/juplo/kafka/outbox/OutboxListener.java
new file mode 100644 (file)
index 0000000..fa53469
--- /dev/null
@@ -0,0 +1,34 @@
+package de.juplo.kafka.outbox;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+
+@Component
+@AllArgsConstructor
+public class OutboxListener
+{
+  private final OutboxRepository repository;
+  private final ObjectMapper mapper;
+
+
+  @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+  public void onUserEvent(OutboxEvent event)
+  {
+    try
+    {
+      repository.save(
+          event.getKey(),
+          mapper.writeValueAsString(event.getValue()),
+          event.getTime());
+    }
+    catch (JsonProcessingException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java
new file mode 100644 (file)
index 0000000..006bd03
--- /dev/null
@@ -0,0 +1,30 @@
+package de.juplo.kafka.outbox;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+  private static final String SQL_UPDATE =
+      "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+
+  private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+  public void save(String key, String value, ZonedDateTime issued)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("key", key);
+    parameters.addValue("value", value);
+    parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+    jdbcTemplate.update(SQL_UPDATE, parameters);
+  }
+}
diff --git a/src/main/resources/db/migration/h2/V2__Table_outbox.sql b/src/main/resources/db/migration/h2/V2__Table_outbox.sql
new file mode 100644 (file)
index 0000000..3227f14
--- /dev/null
@@ -0,0 +1 @@
+CREATE TABLE outbox(id BIGINT PRIMARY KEY AUTO_INCREMENT, key VARCHAR(127), value varchar(1023), issued timestamp);
diff --git a/src/main/resources/db/migration/postgres/V2__Table_outbox.sql b/src/main/resources/db/migration/postgres/V2__Table_outbox.sql
new file mode 100644 (file)
index 0000000..0b7fbfe
--- /dev/null
@@ -0,0 +1,3 @@
+CREATE SEQUENCE outbox_id_seq;
+CREATE TABLE outbox(id BIGINT PRIMARY KEY NOT NULL DEFAULT NEXTVAL('outbox_id_seq'), key VARCHAR(127), value varchar(1023), issued timestamp);
+ALTER SEQUENCE outbox_id_seq OWNED BY outbox.id;