Added a simple implementation of the polling outbox pattern
[demos/kafka/outbox] / outbox / src / main / java / de / juplo / kafka / outbox / OutboxRepository.java
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java
new file mode 100644 (file)
index 0000000..03a68ef
--- /dev/null
@@ -0,0 +1,62 @@
+package de.juplo.kafka.outbox;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.List;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+  private static final String SQL_QUERY =
+      "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
+  private static final String SQL_UPDATE =
+      "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+  private static final String SQL_DELETE =
+      "DELETE FROM outbox WHERE id = :id";
+
+  private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+  public void save(String key, String value, ZonedDateTime issued)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("key", key);
+    parameters.addValue("value", value);
+    parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+    jdbcTemplate.update(SQL_UPDATE, parameters);
+  }
+
+  public int delete(Long id)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("id", id);
+    return jdbcTemplate.update(SQL_DELETE, parameters);
+  }
+
+  public List<OutboxItem> fetch(Long sequenceNumber)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("sequenceNumber", sequenceNumber);
+    return
+        jdbcTemplate.query(
+            SQL_QUERY,
+            parameters,
+            (resultSet, rowNumber) ->
+            {
+              return
+                  OutboxItem
+                      .builder()
+                      .sequenceNumber(resultSet.getLong(1))
+                      .key(resultSet.getString(2))
+                      .value(resultSet.getString(3))
+                      .build();
+            });
+  }
+}