+++ /dev/null
-package de.juplo.kafka.outbox;
-
-import lombok.AllArgsConstructor;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
-import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
-import org.springframework.stereotype.Repository;
-
-import java.sql.Timestamp;
-import java.time.ZonedDateTime;
-import java.util.List;
-
-
-@Repository
-@AllArgsConstructor
-public class OutboxRepository
-{
- private static final String SQL_QUERY =
- "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
- private static final String SQL_UPDATE =
- "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
- private static final String SQL_DELETE =
- "DELETE FROM outbox WHERE id = :id";
-
- private final NamedParameterJdbcTemplate jdbcTemplate;
-
-
- public void save(String key, String value, ZonedDateTime issued)
- {
- MapSqlParameterSource parameters = new MapSqlParameterSource();
- parameters.addValue("key", key);
- parameters.addValue("value", value);
- parameters.addValue("issued", Timestamp.from(issued.toInstant()));
- jdbcTemplate.update(SQL_UPDATE, parameters);
- }
-
- public int delete(Long id)
- {
- MapSqlParameterSource parameters = new MapSqlParameterSource();
- parameters.addValue("id", id);
- return jdbcTemplate.update(SQL_DELETE, parameters);
- }
-
- public List<OutboxItem> fetch(Long sequenceNumber)
- {
- MapSqlParameterSource parameters = new MapSqlParameterSource();
- parameters.addValue("sequenceNumber", sequenceNumber);
- return
- jdbcTemplate.query(
- SQL_QUERY,
- parameters,
- (resultSet, rowNumber) ->
- {
- return
- OutboxItem
- .builder()
- .sequenceNumber(resultSet.getLong(1))
- .key(resultSet.getString(2))
- .value(resultSet.getString(3))
- .build();
- });
- }
-}