X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=outbox%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2FOutboxRepository.java;fp=outbox%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2FOutboxRepository.java;h=03a68ef075a456ae8ba8d2143f3eca94c9e284a8;hp=0000000000000000000000000000000000000000;hb=996911bbed45e0211e48976e3cb3971631361e5b;hpb=64251b8eafa2534c359e8e2fc243c17b5a97a61a diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java new file mode 100644 index 0000000..03a68ef --- /dev/null +++ b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java @@ -0,0 +1,62 @@ +package de.juplo.kafka.outbox; + +import lombok.AllArgsConstructor; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; +import java.util.List; + + +@Repository +@AllArgsConstructor +public class OutboxRepository +{ + private static final String SQL_QUERY = + "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC"; + private static final String SQL_UPDATE = + "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)"; + private static final String SQL_DELETE = + "DELETE FROM outbox WHERE id = :id"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + + + public void save(String key, String value, ZonedDateTime issued) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("key", key); + parameters.addValue("value", value); + parameters.addValue("issued", Timestamp.from(issued.toInstant())); + jdbcTemplate.update(SQL_UPDATE, parameters); + } + + public int delete(Long id) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("id", id); + return jdbcTemplate.update(SQL_DELETE, parameters); + } + + public List fetch(Long sequenceNumber) + { + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("sequenceNumber", sequenceNumber); + return + jdbcTemplate.query( + SQL_QUERY, + parameters, + (resultSet, rowNumber) -> + { + return + OutboxItem + .builder() + .sequenceNumber(resultSet.getLong(1)) + .key(resultSet.getString(2)) + .value(resultSet.getString(3)) + .build(); + }); + } +}