WIP
authorKai Moritz <kai@juplo.de>
Mon, 13 Jul 2020 18:54:46 +0000 (20:54 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 24 Oct 2020 08:59:33 +0000 (10:59 +0200)
20 files changed:
Dockerfile [new file with mode: 0644]
docker-compose.yml [new file with mode: 0644]
pom.xml
src/main/java/de/juplo/boot/data/jdbc/Application.java [deleted file]
src/main/java/de/juplo/boot/data/jdbc/User.java [deleted file]
src/main/java/de/juplo/boot/data/jdbc/UserController.java [deleted file]
src/main/java/de/juplo/boot/data/jdbc/UserEvent.java [deleted file]
src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java [deleted file]
src/main/java/de/juplo/boot/data/jdbc/UserRepository.java [deleted file]
src/main/java/de/juplo/kafka/outbox/polling/Application.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/polling/ApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/polling/User.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/polling/UserController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/polling/UserEvent.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/outbox/polling/UserRepository.java [new file with mode: 0644]
src/main/resources/schema.sql
src/test/java/de/juplo/boot/data/jdbc/ApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/outbox/polling/ApplicationTests.java [new file with mode: 0644]

diff --git a/Dockerfile b/Dockerfile
new file mode 100644 (file)
index 0000000..89e2ebb
--- /dev/null
@@ -0,0 +1,5 @@
+FROM openjdk:8-jre-alpine
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "/usr/bin/java", "-jar", "/opt/app.jar" ]
+CMD []
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644 (file)
index 0000000..23d023f
--- /dev/null
@@ -0,0 +1,44 @@
+version: "2"
+
+services:
+  zookeeper:
+    image: "confluentinc/cp-zookeeper:latest"
+    hostname: zookeeper
+    networks:
+      - tx
+    ports:
+      - 2181:2181
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+
+  kafka:
+    image: "confluentinc/cp-kafka:latest"
+    hostname: kafka
+    networks:
+      - tx
+    depends_on:
+      - zookeeper
+    ports:
+      - 9092:9092
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+      KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
+      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+
+  outbox:
+    image: "outbox:latest"
+    networks:
+      - tx
+    ports:
+      - 8080:8080
+    environment:
+      GPS_BOOTSTRAP_SERVERS: kafka:9093
+    depends_on:
+      - kafka
+
+networks:
+  tx:
+    driver: bridge
diff --git a/pom.xml b/pom.xml
index 10293a1..221b4e3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -8,14 +8,14 @@
      <version>2.1.5.RELEASE</version>
      <relativePath/> <!-- lookup parent from repository -->
    </parent>
-   <groupId>de.juplo.boot.data</groupId>
-   <artifactId>jdbc</artifactId>
+   <groupId>de.juplo.kafka.outbox</groupId>
+   <artifactId>polling</artifactId>
    <version>0.0.1-SNAPSHOT</version>
-   <name>data-jdbc</name>
-   <description>Simple web-app example for spring-boot-data-jdbc</description>
+   <name>Polling-Outbox-Pattern</name>
+   <description>Implementierung des Outbox-Patterns auf Basis von JDBC</description>
 
    <properties>
-     <java.version>1.8</java.version>
+     <java.version>1.9</java.version>
    </properties>
 
    <dependencies>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
      </dependency>
+     <dependency>
+       <groupId>org.apache.kafka</groupId>
+       <artifactId>kafka-clients</artifactId>
+     </dependency>
 
      <dependency>
        <groupId>com.h2database</groupId>
diff --git a/src/main/java/de/juplo/boot/data/jdbc/Application.java b/src/main/java/de/juplo/boot/data/jdbc/Application.java
deleted file mode 100644 (file)
index d7301e1..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-package de.juplo.boot.data.jdbc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-@SpringBootApplication
-public class Application {
-
-    private final static Logger LOG = LoggerFactory.getLogger(Application.class);
-
-
-    public static void main(String[] args) {
-        SpringApplication.run(Application.class, args);
-    }
-}
diff --git a/src/main/java/de/juplo/boot/data/jdbc/User.java b/src/main/java/de/juplo/boot/data/jdbc/User.java
deleted file mode 100644 (file)
index 6d4d552..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-package de.juplo.boot.data.jdbc;
-
-import lombok.*;
-import org.springframework.data.annotation.Id;
-
-import java.time.LocalDateTime;
-
-public class User {
-    @Id
-    Long id;
-    @Getter
-    @Setter
-    String username;
-    @Getter
-    @Setter
-    LocalDateTime created;
-    @Getter
-    @Setter
-    boolean loggedIn;
-
-    public User(String username, LocalDateTime created, boolean loggedIn) {
-        this.username = username;
-        this.created = created;
-        this.loggedIn = loggedIn;
-    }
-}
diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserController.java b/src/main/java/de/juplo/boot/data/jdbc/UserController.java
deleted file mode 100644 (file)
index 0a4a62f..0000000
+++ /dev/null
@@ -1,116 +0,0 @@
-package de.juplo.boot.data.jdbc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.dao.IncorrectResultSizeDataAccessException;
-import org.springframework.http.ResponseEntity;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.util.StreamUtils;
-import org.springframework.web.bind.annotation.*;
-import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
-import org.springframework.web.util.UriComponents;
-
-import javax.servlet.http.HttpServletRequest;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.time.LocalDateTime;
-
-import static de.juplo.boot.data.jdbc.UserEvent.Type.CREATED;
-import static de.juplo.boot.data.jdbc.UserEvent.Type.DELETED;
-
-@RestController
-@Transactional
-@RequestMapping("/users")
-public class UserController {
-
-    private static final Logger LOG = LoggerFactory.getLogger(UserController.class);
-
-
-    private final UserRepository repository;
-    private final ApplicationEventPublisher publisher;
-
-
-    public UserController(
-            UserRepository repository,
-            ApplicationEventPublisher publisher)
-    {
-        this.repository = repository;
-        this.publisher = publisher;
-    }
-
-
-    @PostMapping
-    public ResponseEntity<Void> createUser(
-            ServletUriComponentsBuilder builder,
-            @RequestBody String username) {
-        String sanitizedUsername = UserController.sanitize(username);
-        User user = new User(sanitizedUsername, LocalDateTime.now(), false);
-
-        // Triggering a unique-error for username prevents persistence
-        repository.save(user);
-        publisher.publishEvent(new UserEvent(this, CREATED, sanitizedUsername));
-        user = repository.findByUsername(sanitizedUsername);
-
-        UriComponents uri =
-            builder
-                .fromCurrentRequest()
-                .pathSegment("{username}")
-                .buildAndExpand(sanitizedUsername);
-        return ResponseEntity.created(uri.toUri()).build();
-    }
-
-    @GetMapping("{username}")
-    public ResponseEntity<User> getUser(@PathVariable String username) {
-        User user = repository.findByUsername(UserController.sanitize(username));
-
-        if (user == null)
-            return ResponseEntity.notFound().build();
-
-        return ResponseEntity.ok(user);
-    }
-
-    @DeleteMapping("{username}")
-    public ResponseEntity<User> removeUser(@PathVariable String username) {
-        User user = repository.findByUsername(UserController.sanitize(username));
-
-        if (user == null)
-            return ResponseEntity.notFound().build();
-
-        repository.delete(user);
-        publisher.publishEvent(new UserEvent(this, DELETED, username));
-
-        return ResponseEntity.ok(user);
-    }
-
-    @GetMapping()
-    public ResponseEntity<Iterable<User>> getUsers() {
-        return ResponseEntity.ok(repository.findAll());
-    }
-
-
-    private static String sanitize(String string) {
-        if (string == null)
-            return "";
-
-        return string.trim().toLowerCase();
-    }
-
-    @ExceptionHandler
-    public ResponseEntity<?> incorrectResultSizeDataAccessException(
-        HttpServletRequest request,
-        IncorrectResultSizeDataAccessException e
-        )
-    {
-      String username;
-      try {
-          username = StreamUtils.copyToString(request.getInputStream(), Charset.defaultCharset());
-      }
-      catch (IOException ioe)
-      {
-        username = e.getMessage() + " -> " + ioe.getMessage();
-      }
-      LOG.info("User {} already exists!", username);
-      return ResponseEntity.badRequest().build();
-    }
-}
diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java b/src/main/java/de/juplo/boot/data/jdbc/UserEvent.java
deleted file mode 100644 (file)
index fc8877c..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-package de.juplo.boot.data.jdbc;
-
-import org.springframework.context.ApplicationEvent;
-
-
-public class UserEvent extends ApplicationEvent
-{
-    public enum Type { CREATED, LOGIN, LOGOUT, DELETED }
-
-    final Type type;
-    final String user;
-
-
-    public UserEvent(Object source, Type type, String user)
-    {
-        super(source);
-        this.type = type;
-        this.user = user;
-    }
-}
diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java b/src/main/java/de/juplo/boot/data/jdbc/UserEventListener.java
deleted file mode 100644 (file)
index 5f22e0f..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-package de.juplo.boot.data.jdbc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.event.TransactionalEventListener;
-
-@Component
-public class UserEventListener
-{
-    private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);
-
-
-    @TransactionalEventListener
-    public void onUserEvent(UserEvent event)
-    {
-        LOG.info("{}: {}", event.type, event.user);
-    }
-}
diff --git a/src/main/java/de/juplo/boot/data/jdbc/UserRepository.java b/src/main/java/de/juplo/boot/data/jdbc/UserRepository.java
deleted file mode 100644 (file)
index 1d07359..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-package de.juplo.boot.data.jdbc;
-
-import org.springframework.data.jdbc.repository.query.Query;
-import org.springframework.data.repository.CrudRepository;
-import org.springframework.data.repository.query.Param;
-
-public interface UserRepository extends CrudRepository<User, Long> {
-    @Query("select * from User u where u.username = :username")
-    User findByUsername(@Param("username") String username);
-}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/Application.java b/src/main/java/de/juplo/kafka/outbox/polling/Application.java
new file mode 100644 (file)
index 0000000..7605374
--- /dev/null
@@ -0,0 +1,53 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application {
+
+    private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+
+
+    @Autowired
+    ApplicationProperties properties;
+
+
+    @Bean
+    public String bootstrapServers() { return properties.bootstrapServers; }
+
+    @Bean
+    public String topic() {
+        return properties.topic;
+    }
+
+    @Bean
+    public String consumerGroup() {
+        return properties.consumerGroup;
+    }
+
+    @Bean
+    public WebMvcConfigurer corsConfigurer() {
+        return new WebMvcConfigurer() {
+            @Override
+            public void addCorsMappings(CorsRegistry registry) {
+                registry
+                        .addMapping("/**")
+                        .allowedOrigins("http://localhost:4200");
+            }
+        };
+    }
+
+
+    public static void main(String[] args) {
+        SpringApplication.run(Application.class, args);
+    }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/ApplicationProperties.java b/src/main/java/de/juplo/kafka/outbox/polling/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..8b0bbff
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("outbox.polling")
+public class ApplicationProperties {
+    public String bootstrapServers = "localhost:9092";
+    public String topic = "outbox-polling";
+    public String consumerGroup = "polling";
+
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java b/src/main/java/de/juplo/kafka/outbox/polling/OutboxListener.java
new file mode 100644 (file)
index 0000000..d8f9643
--- /dev/null
@@ -0,0 +1,110 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class OutboxListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
+
+
+    private final JdbcTemplate jdbcTemplate;
+    private final String topic;
+    private final KafkaProducer<Long, String> producer;
+
+
+    public OutboxListener(
+            JdbcTemplate jdbcTemplate,
+            String bootstrapServers,
+            String topic)
+    {
+        this.jdbcTemplate = jdbcTemplate;
+        this.topic = topic;
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", bootstrapServers);
+        props.put("key.serializer", LongSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        producer = new KafkaProducer<>(props);
+    }
+
+
+    @TransactionalEventListener
+    @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
+    public void onOutboxEvent(UserEvent userEvent)
+    {
+        List<Map<String, Object>> result =
+            jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+
+        try {
+            for (Map<String, Object> entry : result)
+            {
+                Long id = (Long)entry.get("id");
+                UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
+                String username = (String)entry.get("username");
+                String event = username + ":" + type.name();
+
+                ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
+                producer.send(record, (metadata, e) -> {
+                    if (e != null) {
+                        LOG.error("Could not send event {} ({}): ", id, event, e);
+                    }
+                    else {
+                        LOG.debug(
+                                "Send event {} ({}) with offset {} to partition {}",
+                                id,
+                                event,
+                                metadata.offset(),
+                                metadata.partition());
+                        deleteOutboxEntry(id);
+                    }
+                });
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException("Fehler beim Senden des Events", e);
+        }
+    }
+
+    @Transactional
+    void deleteOutboxEntry(Long id)
+    {
+        try
+        {
+            int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
+            LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
+        }
+        catch (DataAccessException e)
+        {
+            LOG.error("Execption while deleting row from outbox: {}!", id, e);
+        }
+    }
+
+    @PreDestroy
+    public void stop(){
+        LOG.info("Closing the KafkaProducer...");
+        try {
+            producer.close(5, TimeUnit.SECONDS);
+            LOG.debug("Successfully closed the KafkaProducer");
+        }
+        catch (Exception e) {
+            LOG.warn("Exception while closing the KafkaProducer!", e);
+        }
+    }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/User.java b/src/main/java/de/juplo/kafka/outbox/polling/User.java
new file mode 100644 (file)
index 0000000..5181175
--- /dev/null
@@ -0,0 +1,26 @@
+package de.juplo.kafka.outbox.polling;
+
+import lombok.*;
+import org.springframework.data.annotation.Id;
+
+import java.time.LocalDateTime;
+
+public class User {
+    @Id
+    Long id;
+    @Getter
+    @Setter
+    String username;
+    @Getter
+    @Setter
+    LocalDateTime created;
+    @Getter
+    @Setter
+    boolean loggedIn;
+
+    public User(String username, LocalDateTime created, boolean loggedIn) {
+        this.username = username;
+        this.created = created;
+        this.loggedIn = loggedIn;
+    }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/UserController.java b/src/main/java/de/juplo/kafka/outbox/polling/UserController.java
new file mode 100644 (file)
index 0000000..798ebe7
--- /dev/null
@@ -0,0 +1,117 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.dao.IncorrectResultSizeDataAccessException;
+import org.springframework.http.ResponseEntity;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.StreamUtils;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
+import org.springframework.web.util.UriComponents;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.time.LocalDateTime;
+
+import static de.juplo.kafka.outbox.polling.UserEvent.Type.CREATED;
+import static de.juplo.kafka.outbox.polling.UserEvent.Type.DELETED;
+
+
+@RestController
+@Transactional
+@RequestMapping("/users")
+public class UserController {
+
+    private static final Logger LOG = LoggerFactory.getLogger(UserController.class);
+
+
+    private final UserRepository repository;
+    private final ApplicationEventPublisher publisher;
+
+
+    public UserController(
+            UserRepository repository,
+            ApplicationEventPublisher publisher)
+    {
+        this.repository = repository;
+        this.publisher = publisher;
+    }
+
+
+    @PostMapping
+    public ResponseEntity<Void> createUser(
+            ServletUriComponentsBuilder builder,
+            @RequestBody String username) {
+        String sanitizedUsername = UserController.sanitize(username);
+        User user = new User(sanitizedUsername, LocalDateTime.now(), false);
+
+        // Triggering a unique-error for username prevents persistence
+        repository.save(user);
+        publisher.publishEvent(new UserEvent(this, CREATED, sanitizedUsername));
+        user = repository.findByUsername(sanitizedUsername);
+
+        UriComponents uri =
+            builder
+                .fromCurrentRequest()
+                .pathSegment("{username}")
+                .buildAndExpand(sanitizedUsername);
+        return ResponseEntity.created(uri.toUri()).build();
+    }
+
+    @GetMapping("{username}")
+    public ResponseEntity<User> getUser(@PathVariable String username) {
+        User user = repository.findByUsername(UserController.sanitize(username));
+
+        if (user == null)
+            return ResponseEntity.notFound().build();
+
+        return ResponseEntity.ok(user);
+    }
+
+    @DeleteMapping("{username}")
+    public ResponseEntity<User> removeUser(@PathVariable String username) {
+        User user = repository.findByUsername(UserController.sanitize(username));
+
+        if (user == null)
+            return ResponseEntity.notFound().build();
+
+        repository.delete(user);
+        publisher.publishEvent(new UserEvent(this, DELETED, username));
+
+        return ResponseEntity.ok(user);
+    }
+
+    @GetMapping()
+    public ResponseEntity<Iterable<User>> getUsers() {
+        return ResponseEntity.ok(repository.findAll());
+    }
+
+
+    private static String sanitize(String string) {
+        if (string == null)
+            return "";
+
+        return string.trim().toLowerCase();
+    }
+
+    @ExceptionHandler
+    public ResponseEntity<?> incorrectResultSizeDataAccessException(
+        HttpServletRequest request,
+        IncorrectResultSizeDataAccessException e
+        )
+    {
+      String username;
+      try {
+          username = StreamUtils.copyToString(request.getInputStream(), Charset.defaultCharset());
+      }
+      catch (IOException ioe)
+      {
+        username = e.getMessage() + " -> " + ioe.getMessage();
+      }
+      LOG.info("User {} already exists!", username);
+      return ResponseEntity.badRequest().build();
+    }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/UserEvent.java b/src/main/java/de/juplo/kafka/outbox/polling/UserEvent.java
new file mode 100644 (file)
index 0000000..15db4b7
--- /dev/null
@@ -0,0 +1,59 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.springframework.context.ApplicationEvent;
+
+
+public class UserEvent extends ApplicationEvent
+{
+    public enum Type
+    {
+        CREATED(1),
+        LOGIN(2),
+        LOGOUT(3),
+        DELETED(4);
+
+        public final int num;
+
+
+        Type(int num)
+        {
+            this.num = num;
+        }
+
+
+        public static Type ofInt(int ordinal)
+        {
+            switch (ordinal)
+            {
+                case 1: return CREATED;
+                case 2: return LOGIN;
+                case 3: return LOGOUT;
+                case 4: return DELETED;
+                default:
+                    throw new RuntimeException("Unknown ordinal: " + ordinal);
+            }
+        }
+
+        public static int toInt(Type type)
+        {
+            return type.toInt();
+        }
+
+        public int toInt()
+        {
+            return this.num;
+        }
+    }
+
+
+    final Type type;
+    final String user;
+
+
+    public UserEvent(Object source, Type type, String user)
+    {
+        super(source);
+        this.type = type;
+        this.user = user;
+    }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java b/src/main/java/de/juplo/kafka/outbox/polling/UserEventListener.java
new file mode 100644 (file)
index 0000000..b61fcca
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.EventListener;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import java.util.Map;
+
+@Component
+public class UserEventListener
+{
+    private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);
+
+    private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+    public UserEventListener(NamedParameterJdbcTemplate jdbcTemplate)
+    {
+      this.jdbcTemplate = jdbcTemplate;
+    }
+
+
+    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+    public void onUserEvent(UserEvent event)
+    {
+      LOG.info("{}: {}", event.type, event.user);
+      SqlParameterSource parameters =
+          new MapSqlParameterSource(Map.of("event", event.type.toInt(), "username", event.user));
+      jdbcTemplate.update("INSERT INTO events(event, username) VALUES(:event, :username)", parameters);
+    }
+}
diff --git a/src/main/java/de/juplo/kafka/outbox/polling/UserRepository.java b/src/main/java/de/juplo/kafka/outbox/polling/UserRepository.java
new file mode 100644 (file)
index 0000000..73b3f9e
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.springframework.data.jdbc.repository.query.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+
+public interface UserRepository extends CrudRepository<User, Long> {
+    @Query("select * from User u where u.username = :username")
+    User findByUsername(@Param("username") String username);
+}
index 353c36b..b2e329f 100644 (file)
@@ -1 +1,2 @@
 CREATE TABLE user(id BIGINT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(255), created TIMESTAMP, logged_in BIT)
+CREATE TABLE events(id BIGINT PRIMARY KEY AUTO_INCREMENT, event SMALLINT, username varchar(255))
diff --git a/src/test/java/de/juplo/boot/data/jdbc/ApplicationTests.java b/src/test/java/de/juplo/boot/data/jdbc/ApplicationTests.java
deleted file mode 100644 (file)
index c77d5b7..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.boot.data.jdbc;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest
-public class ApplicationTests {
-
-    @Test
-    public void contextLoads() {
-    }
-
-}
diff --git a/src/test/java/de/juplo/kafka/outbox/polling/ApplicationTests.java b/src/test/java/de/juplo/kafka/outbox/polling/ApplicationTests.java
new file mode 100644 (file)
index 0000000..52aed2e
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.outbox.polling;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests {
+
+    @Test
+    public void contextLoads() {
+    }
+
+}