sleep 1
done;
+docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0 outbox
docker-compose stop
import java.time.Clock;
-@SpringBootApplication
+import java.time.Clock;
+
+
+@SpringBootApplication(scanBasePackages = {
+ "de.juplo.boot.data.jdbc",
+ "de.juplo.kafka.outbox"
+ })
public class Application {
private final static Logger LOG = LoggerFactory.getLogger(Application.class);
package de.juplo.boot.data.jdbc;
+import de.juplo.kafka.outbox.OutboxEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
repository.delete(user);
publisher.publishEvent(
- new UserEvent(
+ new OutboxEvent(
this,
user.getUsername(),
DELETED,
package de.juplo.boot.data.jdbc;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.ToString;
-import org.springframework.context.ApplicationEvent;
+import de.juplo.kafka.outbox.OutboxEvent;
import java.time.ZonedDateTime;
-@Getter
-@EqualsAndHashCode
-@ToString
-public class UserEvent extends ApplicationEvent
+public class UserEvent extends OutboxEvent
{
- private final String key;
- private final UserStatus status;
- private final ZonedDateTime time;
-
-
public UserEvent(Object source, String key, UserStatus status, ZonedDateTime time)
{
- super(source);
- this.key = key;
- this.status = status;
- this.time = time;
+ super(source, key, status, time);
}
}
@TransactionalEventListener
public void onUserEvent(UserEvent event)
{
- LOG.info("{}: {} - {}", event.getTime(), event.getStatus(), event.getKey());
+ LOG.info("{}: {} - {}", event.getTime(), event.getValue(), event.getKey());
}
}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.springframework.context.ApplicationEvent;
+
+import java.time.ZonedDateTime;
+
+
+@ToString
+@EqualsAndHashCode
+public class OutboxEvent extends ApplicationEvent
+{
+ @Getter
+ private final String key;
+ @Getter
+ private final Object value;
+ @Getter
+ private final ZonedDateTime time;
+
+
+ public OutboxEvent(Object source, String key, Object value, ZonedDateTime time)
+ {
+ super(source);
+ this.key = key;
+ this.value = value;
+ this.time = time;
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AllArgsConstructor;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+
+@Component
+@AllArgsConstructor
+public class OutboxListener
+{
+ private final OutboxRepository repository;
+ private final ObjectMapper mapper;
+
+
+ @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+ public void onUserEvent(OutboxEvent event)
+ {
+ try
+ {
+ repository.save(
+ event.getKey(),
+ mapper.writeValueAsString(event.getValue()),
+ event.getTime());
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+ private static final String SQL_UPDATE =
+ "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+
+ private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+ public void save(String key, String value, ZonedDateTime issued)
+ {
+ MapSqlParameterSource parameters = new MapSqlParameterSource();
+ parameters.addValue("key", key);
+ parameters.addValue("value", value);
+ parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+ jdbcTemplate.update(SQL_UPDATE, parameters);
+ }
+}
--- /dev/null
+CREATE TABLE outbox(id BIGINT PRIMARY KEY AUTO_INCREMENT, key VARCHAR(127), value varchar(1023), issued timestamp);
--- /dev/null
+CREATE SEQUENCE outbox_id_seq;
+CREATE TABLE outbox(id BIGINT PRIMARY KEY NOT NULL DEFAULT NEXTVAL('outbox_id_seq'), key VARCHAR(127), value varchar(1023), issued timestamp);
+ALTER SEQUENCE outbox_id_seq OWNED BY outbox.id;