--- /dev/null
+FROM openjdk:8-jre-alpine
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "/usr/bin/java", "-jar", "/opt/app.jar" ]
+CMD []
--- /dev/null
+version: "2"
+
+services:
+ zookeeper:
+ image: "confluentinc/cp-zookeeper:latest"
+ hostname: zookeeper
+ networks:
+ - tx
+ ports:
+ - 2181:2181
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: "confluentinc/cp-kafka:latest"
+ hostname: kafka
+ networks:
+ - tx
+ depends_on:
+ - zookeeper
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+ KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+
+ outbox:
+ image: "outbox:latest"
+ networks:
+ - tx
+ ports:
+ - 8080:8080
+ environment:
+ GPS_BOOTSTRAP_SERVERS: kafka:9093
+ depends_on:
+ - kafka
+
+networks:
+ tx:
+ driver: bridge
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
- <groupId>de.juplo.boot.data</groupId>
- <artifactId>jdbc</artifactId>
+ <groupId>de.juplo.kafka.outbox</groupId>
+ <artifactId>polling</artifactId>
<version>0.0.1-SNAPSHOT</version>
- <name>data-jdbc</name>
- <description>Simple web-app example for spring-boot-data-jdbc</description>
+ <name>Polling-Outbox-Pattern</name>
+ <description>Implementierung des Outbox-Patterns auf Basis von JDBC</description>
<properties>
- <java.version>1.8</java.version>
+ <java.version>1.9</java.version>
</properties>
<dependencies>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
<dependency>
<groupId>com.h2database</groupId>
+++ /dev/null
-package de.juplo.boot.data.jdbc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-@SpringBootApplication
-public class Application {
-
- private final static Logger LOG = LoggerFactory.getLogger(Application.class);
-
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
-}
+++ /dev/null
-package de.juplo.boot.data.jdbc;
-
-import lombok.*;
-import org.springframework.data.annotation.Id;
-
-import java.time.LocalDateTime;
-
-public class User {
- @Id
- Long id;
- @Getter
- @Setter
- String username;
- @Getter
- @Setter
- LocalDateTime created;
- @Getter
- @Setter
- boolean loggedIn;
-
- public User(String username, LocalDateTime created, boolean loggedIn) {
- this.username = username;
- this.created = created;
- this.loggedIn = loggedIn;
- }
-}
+++ /dev/null
-package de.juplo.boot.data.jdbc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.dao.IncorrectResultSizeDataAccessException;
-import org.springframework.http.ResponseEntity;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.util.StreamUtils;
-import org.springframework.web.bind.annotation.*;
-import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
-import org.springframework.web.util.UriComponents;
-
-import javax.servlet.http.HttpServletRequest;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.time.LocalDateTime;
-
-import static de.juplo.boot.data.jdbc.UserEvent.Type.CREATED;
-import static de.juplo.boot.data.jdbc.UserEvent.Type.DELETED;
-
-@RestController
-@Transactional
-@RequestMapping("/users")
-public class UserController {
-
- private static final Logger LOG = LoggerFactory.getLogger(UserController.class);
-
-
- private final UserRepository repository;
- private final ApplicationEventPublisher publisher;
-
-
- public UserController(
- UserRepository repository,
- ApplicationEventPublisher publisher)
- {
- this.repository = repository;
- this.publisher = publisher;
- }
-
-
- @PostMapping
- public ResponseEntity<Void> createUser(
- ServletUriComponentsBuilder builder,
- @RequestBody String username) {
- String sanitizedUsername = UserController.sanitize(username);
- User user = new User(sanitizedUsername, LocalDateTime.now(), false);
-
- // Triggering a unique-error for username prevents persistence
- repository.save(user);
- publisher.publishEvent(new UserEvent(this, CREATED, sanitizedUsername));
- user = repository.findByUsername(sanitizedUsername);
-
- UriComponents uri =
- builder
- .fromCurrentRequest()
- .pathSegment("{username}")
- .buildAndExpand(sanitizedUsername);
- return ResponseEntity.created(uri.toUri()).build();
- }
-
- @GetMapping("{username}")
- public ResponseEntity<User> getUser(@PathVariable String username) {
- User user = repository.findByUsername(UserController.sanitize(username));
-
- if (user == null)
- return ResponseEntity.notFound().build();
-
- return ResponseEntity.ok(user);
- }
-
- @DeleteMapping("{username}")
- public ResponseEntity<User> removeUser(@PathVariable String username) {
- User user = repository.findByUsername(UserController.sanitize(username));
-
- if (user == null)
- return ResponseEntity.notFound().build();
-
- repository.delete(user);
- publisher.publishEvent(new UserEvent(this, DELETED, username));
-
- return ResponseEntity.ok(user);
- }
-
- @GetMapping()
- public ResponseEntity<Iterable<User>> getUsers() {
- return ResponseEntity.ok(repository.findAll());
- }
-
-
- private static String sanitize(String string) {
- if (string == null)
- return "";
-
- return string.trim().toLowerCase();
- }
-
- @ExceptionHandler
- public ResponseEntity<?> incorrectResultSizeDataAccessException(
- HttpServletRequest request,
- IncorrectResultSizeDataAccessException e
- )
- {
- String username;
- try {
- username = StreamUtils.copyToString(request.getInputStream(), Charset.defaultCharset());
- }
- catch (IOException ioe)
- {
- username = e.getMessage() + " -> " + ioe.getMessage();
- }
- LOG.info("User {} already exists!", username);
- return ResponseEntity.badRequest().build();
- }
-}
+++ /dev/null
-package de.juplo.boot.data.jdbc;
-
-import org.springframework.context.ApplicationEvent;
-
-
-public class UserEvent extends ApplicationEvent
-{
- public enum Type { CREATED, LOGIN, LOGOUT, DELETED }
-
- final Type type;
- final String user;
-
-
- public UserEvent(Object source, Type type, String user)
- {
- super(source);
- this.type = type;
- this.user = user;
- }
-}
+++ /dev/null
-package de.juplo.boot.data.jdbc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.event.TransactionalEventListener;
-
-@Component
-public class UserEventListener
-{
- private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);
-
-
- @TransactionalEventListener
- public void onUserEvent(UserEvent event)
- {
- LOG.info("{}: {}", event.type, event.user);
- }
-}
+++ /dev/null
-package de.juplo.boot.data.jdbc;
-
-import org.springframework.data.jdbc.repository.query.Query;
-import org.springframework.data.repository.CrudRepository;
-import org.springframework.data.repository.query.Param;
-
-public interface UserRepository extends CrudRepository<User, Long> {
- @Query("select * from User u where u.username = :username")
- User findByUsername(@Param("username") String username);
-}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class Application {
+
+ private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+
+
+ @Autowired
+ ApplicationProperties properties;
+
+
+ @Bean
+ public String bootstrapServers() { return properties.bootstrapServers; }
+
+ @Bean
+ public String topic() {
+ return properties.topic;
+ }
+
+ @Bean
+ public String consumerGroup() {
+ return properties.consumerGroup;
+ }
+
+ @Bean
+ public WebMvcConfigurer corsConfigurer() {
+ return new WebMvcConfigurer() {
+ @Override
+ public void addCorsMappings(CorsRegistry registry) {
+ registry
+ .addMapping("/**")
+ .allowedOrigins("http://localhost:4200");
+ }
+ };
+ }
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("outbox.polling")
+public class ApplicationProperties {
+ public String bootstrapServers = "localhost:9092";
+ public String topic = "outbox-polling";
+ public String consumerGroup = "polling";
+
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class OutboxListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OutboxListener.class);
+
+
+ private final JdbcTemplate jdbcTemplate;
+ private final String topic;
+ private final KafkaProducer<Long, String> producer;
+
+
+ public OutboxListener(
+ JdbcTemplate jdbcTemplate,
+ String bootstrapServers,
+ String topic)
+ {
+ this.jdbcTemplate = jdbcTemplate;
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServers);
+ props.put("key.serializer", LongSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ producer = new KafkaProducer<>(props);
+ }
+
+
+ @TransactionalEventListener
+ @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
+ public void onOutboxEvent(UserEvent userEvent)
+ {
+ List<Map<String, Object>> result =
+ jdbcTemplate.queryForList("SELECT id, event, username FROM events ORDER BY id ASC");
+
+ try {
+ for (Map<String, Object> entry : result)
+ {
+ Long id = (Long)entry.get("id");
+ UserEvent.Type type = UserEvent.Type.ofInt((Short)entry.get("event"));
+ String username = (String)entry.get("username");
+ String event = username + ":" + type.name();
+
+ ProducerRecord<Long, String> record = new ProducerRecord<>(topic, id, event);
+ producer.send(record, (metadata, e) -> {
+ if (e != null) {
+ LOG.error("Could not send event {} ({}): ", id, event, e);
+ }
+ else {
+ LOG.debug(
+ "Send event {} ({}) with offset {} to partition {}",
+ id,
+ event,
+ metadata.offset(),
+ metadata.partition());
+ deleteOutboxEntry(id);
+ }
+ });
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException("Fehler beim Senden des Events", e);
+ }
+ }
+
+ @Transactional
+ void deleteOutboxEntry(Long id)
+ {
+ try
+ {
+ int result = jdbcTemplate.update("DELETE FROM events WHERE id = ?", id);
+ LOG.debug("entry {} {} from outbox", id, result == 1 ? "deleted" : "has already been deleted");
+ }
+ catch (DataAccessException e)
+ {
+ LOG.error("Execption while deleting row from outbox: {}!", id, e);
+ }
+ }
+
+ @PreDestroy
+ public void stop(){
+ LOG.info("Closing the KafkaProducer...");
+ try {
+ producer.close(5, TimeUnit.SECONDS);
+ LOG.debug("Successfully closed the KafkaProducer");
+ }
+ catch (Exception e) {
+ LOG.warn("Exception while closing the KafkaProducer!", e);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import lombok.*;
+import org.springframework.data.annotation.Id;
+
+import java.time.LocalDateTime;
+
+public class User {
+ @Id
+ Long id;
+ @Getter
+ @Setter
+ String username;
+ @Getter
+ @Setter
+ LocalDateTime created;
+ @Getter
+ @Setter
+ boolean loggedIn;
+
+ public User(String username, LocalDateTime created, boolean loggedIn) {
+ this.username = username;
+ this.created = created;
+ this.loggedIn = loggedIn;
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.dao.IncorrectResultSizeDataAccessException;
+import org.springframework.http.ResponseEntity;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.StreamUtils;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
+import org.springframework.web.util.UriComponents;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.time.LocalDateTime;
+
+import static de.juplo.kafka.outbox.polling.UserEvent.Type.CREATED;
+import static de.juplo.kafka.outbox.polling.UserEvent.Type.DELETED;
+
+
+@RestController
+@Transactional
+@RequestMapping("/users")
+public class UserController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UserController.class);
+
+
+ private final UserRepository repository;
+ private final ApplicationEventPublisher publisher;
+
+
+ public UserController(
+ UserRepository repository,
+ ApplicationEventPublisher publisher)
+ {
+ this.repository = repository;
+ this.publisher = publisher;
+ }
+
+
+ @PostMapping
+ public ResponseEntity<Void> createUser(
+ ServletUriComponentsBuilder builder,
+ @RequestBody String username) {
+ String sanitizedUsername = UserController.sanitize(username);
+ User user = new User(sanitizedUsername, LocalDateTime.now(), false);
+
+ // Triggering a unique-error for username prevents persistence
+ repository.save(user);
+ publisher.publishEvent(new UserEvent(this, CREATED, sanitizedUsername));
+ user = repository.findByUsername(sanitizedUsername);
+
+ UriComponents uri =
+ builder
+ .fromCurrentRequest()
+ .pathSegment("{username}")
+ .buildAndExpand(sanitizedUsername);
+ return ResponseEntity.created(uri.toUri()).build();
+ }
+
+ @GetMapping("{username}")
+ public ResponseEntity<User> getUser(@PathVariable String username) {
+ User user = repository.findByUsername(UserController.sanitize(username));
+
+ if (user == null)
+ return ResponseEntity.notFound().build();
+
+ return ResponseEntity.ok(user);
+ }
+
+ @DeleteMapping("{username}")
+ public ResponseEntity<User> removeUser(@PathVariable String username) {
+ User user = repository.findByUsername(UserController.sanitize(username));
+
+ if (user == null)
+ return ResponseEntity.notFound().build();
+
+ repository.delete(user);
+ publisher.publishEvent(new UserEvent(this, DELETED, username));
+
+ return ResponseEntity.ok(user);
+ }
+
+ @GetMapping()
+ public ResponseEntity<Iterable<User>> getUsers() {
+ return ResponseEntity.ok(repository.findAll());
+ }
+
+
+ private static String sanitize(String string) {
+ if (string == null)
+ return "";
+
+ return string.trim().toLowerCase();
+ }
+
+ @ExceptionHandler
+ public ResponseEntity<?> incorrectResultSizeDataAccessException(
+ HttpServletRequest request,
+ IncorrectResultSizeDataAccessException e
+ )
+ {
+ String username;
+ try {
+ username = StreamUtils.copyToString(request.getInputStream(), Charset.defaultCharset());
+ }
+ catch (IOException ioe)
+ {
+ username = e.getMessage() + " -> " + ioe.getMessage();
+ }
+ LOG.info("User {} already exists!", username);
+ return ResponseEntity.badRequest().build();
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.springframework.context.ApplicationEvent;
+
+
+public class UserEvent extends ApplicationEvent
+{
+ public enum Type
+ {
+ CREATED(1),
+ LOGIN(2),
+ LOGOUT(3),
+ DELETED(4);
+
+ public final int num;
+
+
+ Type(int num)
+ {
+ this.num = num;
+ }
+
+
+ public static Type ofInt(int ordinal)
+ {
+ switch (ordinal)
+ {
+ case 1: return CREATED;
+ case 2: return LOGIN;
+ case 3: return LOGOUT;
+ case 4: return DELETED;
+ default:
+ throw new RuntimeException("Unknown ordinal: " + ordinal);
+ }
+ }
+
+ public static int toInt(Type type)
+ {
+ return type.toInt();
+ }
+
+ public int toInt()
+ {
+ return this.num;
+ }
+ }
+
+
+ final Type type;
+ final String user;
+
+
+ public UserEvent(Object source, Type type, String user)
+ {
+ super(source);
+ this.type = type;
+ this.user = user;
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.EventListener;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.event.TransactionPhase;
+import org.springframework.transaction.event.TransactionalEventListener;
+
+import java.util.Map;
+
+@Component
+public class UserEventListener
+{
+ private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);
+
+ private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+ public UserEventListener(NamedParameterJdbcTemplate jdbcTemplate)
+ {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+
+ @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
+ public void onUserEvent(UserEvent event)
+ {
+ LOG.info("{}: {}", event.type, event.user);
+ SqlParameterSource parameters =
+ new MapSqlParameterSource(Map.of("event", event.type.toInt(), "username", event.user));
+ jdbcTemplate.update("INSERT INTO events(event, username) VALUES(:event, :username)", parameters);
+ }
+}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.springframework.data.jdbc.repository.query.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+
+public interface UserRepository extends CrudRepository<User, Long> {
+ @Query("select * from User u where u.username = :username")
+ User findByUsername(@Param("username") String username);
+}
CREATE TABLE user(id BIGINT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(255), created TIMESTAMP, logged_in BIT)
+CREATE TABLE events(id BIGINT PRIMARY KEY AUTO_INCREMENT, event SMALLINT, username varchar(255))
+++ /dev/null
-package de.juplo.boot.data.jdbc;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest
-public class ApplicationTests {
-
- @Test
- public void contextLoads() {
- }
-
-}
--- /dev/null
+package de.juplo.kafka.outbox.polling;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests {
+
+ @Test
+ public void contextLoads() {
+ }
+
+}