--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.EventListener;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import java.util.Map;
+
+@Component
+public class UserEventListener
+{
+ private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);
+
+ private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+ public UserEventListener(NamedParameterJdbcTemplate jdbcTemplate)
+ {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+
+ @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+ public void onUserEvent(UserEvent event)
+ {
+ LOG.info("{}: {}", event.type, event.user);
+ SqlParameterSource parameters =
+ new MapSqlParameterSource(Map.of("event", event.type.toInt(), "username", event.user));
+ jdbcTemplate.update("INSERT INTO events(event, username) VALUES(:event, :username)", parameters);
+ }
+}