WIP
[demos/kafka/outbox] / src / main / java / de / juplo / kafka / outbox / polling / UserEventListener.java
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java b/src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java
new file mode 100644 (file)
index 0000000..b61fcca
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.EventListener;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import java.util.Map;
+
+@Component
+public class UserEventListener
+{
+    private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);
+
+    private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+    public UserEventListener(NamedParameterJdbcTemplate jdbcTemplate)
+    {
+      this.jdbcTemplate = jdbcTemplate;
+    }
+
+
+    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+    public void onUserEvent(UserEvent event)
+    {
+      LOG.info("{}: {}", event.type, event.user);
+      SqlParameterSource parameters =
+          new MapSqlParameterSource(Map.of("event", event.type.toInt(), "username", event.user));
+      jdbcTemplate.update("INSERT INTO events(event, username) VALUES(:event, :username)", parameters);
+    }
+}