WIP
authorKai Moritz <kai@juplo.de>
Sun, 12 Jul 2020 12:30:05 +0000 (14:30 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 12 Jul 2020 12:30:05 +0000 (14:30 +0200)
pom.xml
src/main/java/de/trion/kafka/outbox/Application.java
src/main/java/de/trion/kafka/outbox/ApplicationProperties.java [deleted file]
src/main/java/de/trion/kafka/outbox/OutboxConsumer.java [deleted file]
src/main/java/de/trion/kafka/outbox/OutboxProducer.java [deleted file]
src/main/java/de/trion/kafka/outbox/OutboxService.java [deleted file]

diff --git a/pom.xml b/pom.xml
index e55601f..322ea86 100644 (file)
--- a/pom.xml
+++ b/pom.xml
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-data-jdbc</artifactId>
         </dependency>
-        <dependency>
-                       <groupId>com.h2database</groupId>
-                       <artifactId>h2</artifactId>
-                       <scope>runtime</scope>
-               </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-json</artifactId>
         </dependency>
-
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+
         <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-        </dependency>
+                       <groupId>com.h2database</groupId>
+                       <artifactId>h2</artifactId>
+                       <scope>runtime</scope>
+               </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
index 878e75e..f6865f5 100644 (file)
@@ -2,51 +2,15 @@ package de.trion.kafka.outbox;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.web.servlet.config.annotation.CorsRegistry;
-import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
 
 @SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
 public class Application {
 
     private final static Logger LOG = LoggerFactory.getLogger(Application.class);
 
 
-    @Autowired
-    ApplicationProperties properties;
-
-
-    @Bean
-    public String bootstrapServers() { return properties.bootstrapServers; }
-
-    @Bean
-    public String topic() {
-        return properties.topic;
-    }
-
-    @Bean
-    public String consumerGroup() {
-        return properties.consumerGroup;
-    }
-
-    @Bean
-    public WebMvcConfigurer corsConfigurer() {
-        return new WebMvcConfigurer() {
-            @Override
-            public void addCorsMappings(CorsRegistry registry) {
-                registry
-                        .addMapping("/**")
-                        .allowedOrigins("http://localhost:4200");
-            }
-        };
-    }
-
-
     public static void main(String[] args) {
         SpringApplication.run(Application.class, args);
     }
diff --git a/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java b/src/main/java/de/trion/kafka/outbox/ApplicationProperties.java
deleted file mode 100644 (file)
index 7ba4c06..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-package de.trion.kafka.outbox;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-@ConfigurationProperties("outbox.polling")
-public class ApplicationProperties {
-    public String bootstrapServers = "localhost:9092";
-    public String topic = "outbox-polling";
-    public String consumerGroup = "polling";
-
-
-    public String getBootstrapServers() {
-        return bootstrapServers;
-    }
-
-    public void setBootstrapServers(String bootstrapServers) {
-        this.bootstrapServers = bootstrapServers;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-}
diff --git a/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java b/src/main/java/de/trion/kafka/outbox/OutboxConsumer.java
deleted file mode 100644 (file)
index e33a28b..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-package de.trion.kafka.outbox;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
-
-
-@Component
-public class OutboxConsumer implements ApplicationRunner, Runnable {
-
-    private final static Logger LOG = LoggerFactory.getLogger(OutboxConsumer.class);
-
-    private final OutboxService service;
-    private final OutboxProducer sender;
-    private final ObjectMapper mapper;
-    private final String topic;
-    private final KafkaConsumer<Long, String> consumer;
-    private final Thread thread;
-
-    private long internalState = 1;
-
-
-    public OutboxConsumer(
-            OutboxService service,
-            OutboxProducer sender,
-            ObjectMapper mapper,
-            String bootstrapServers,
-            String consumerGroup,
-            String topic) {
-
-        this.service = service;
-        this.sender = sender;
-        this.mapper = mapper;
-        this.topic = topic;
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", bootstrapServers);
-        props.put("group.id", consumerGroup);
-        props.put("auto.commit.interval.ms", 15000);
-        props.put("metadata.max.age.ms", 1000);
-        props.put("key.deserializer", LongDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-        consumer = new KafkaConsumer<>(props);
-
-        thread = new Thread(this);
-    }
-
-
-    @Override
-    public void run() {
-        try
-        {
-            LOG.info("Subscribing to topic " + topic);
-            consumer.subscribe(Arrays.asList(topic));
-
-            while (true)
-            {
-                ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
-                for (ConsumerRecord<Long, String> record : records) {
-                    LOG.debug("Ignoring command {}", record.value());
-                }
-            }
-        }
-        catch (WakeupException e) {}
-        catch (Exception e) {
-            LOG.error("Unexpected exception!", e);
-        }
-        finally
-        {
-            LOG.info("Closing the KafkaConsumer...");
-            try {
-                consumer.close(Duration.ofSeconds(5));
-                LOG.debug("Successfully closed the KafkaConsumer");
-            }
-            catch (Exception e) {
-                LOG.warn("Exception while closing the KafkaConsumer!", e);
-            }
-        }
-    }
-
-
-    @Override
-    public void run(ApplicationArguments args) {
-        thread.start();
-        try {
-            thread.join();
-            LOG.info("Successfully joined the consumer-thread");
-        }
-        catch (InterruptedException e) {
-            LOG.info("Main-thread was interrupted while joining the consumer-thread");
-        }
-    }
-
-    @PreDestroy
-    public void stop()
-    {
-        LOG.info("Stopping the KafkaConsumer...");
-        consumer.wakeup();
-    }
-}
diff --git a/src/main/java/de/trion/kafka/outbox/OutboxProducer.java b/src/main/java/de/trion/kafka/outbox/OutboxProducer.java
deleted file mode 100644 (file)
index d493aee..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-package de.trion.kafka.outbox;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-@Component
-public class OutboxProducer {
-
-    private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
-
-    private final ObjectMapper mapper;
-    private final String topic;
-    private final KafkaProducer<String, String> producer;
-
-
-    public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
-        this.mapper = mapper;
-        this.topic = topic;
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", bootstrapServers);
-        props.put("key.serializer", StringSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-        producer = new KafkaProducer<>(props);
-    }
-
-
-    public void send(UserEvent event) {
-        try {
-            String json = mapper.writeValueAsString(event);
-            ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
-            producer.send(record, (metadata, e) -> {
-                if (e != null) {
-                    LOG.error("Could not send event {}!", json, e);
-                }
-                else {
-                    LOG.debug(
-                            "{}: send event {} with offset {} to partition {}",
-                            event.user,
-                            event.id,
-                            metadata.offset(),
-                            metadata.partition());
-                }
-            });
-        } catch (Exception e) {
-            throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
-        }
-    }
-
-
-    @PreDestroy
-    public void stop(){
-        LOG.info("Closing the KafkaProducer...");
-        try {
-            producer.close(5, TimeUnit.SECONDS);
-            LOG.debug("Successfully closed the KafkaProducer");
-        }
-        catch (Exception e) {
-            LOG.warn("Exception while closing the KafkaProducer!", e);
-        }
-    }
-}
diff --git a/src/main/java/de/trion/kafka/outbox/OutboxService.java b/src/main/java/de/trion/kafka/outbox/OutboxService.java
deleted file mode 100644 (file)
index e003839..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-package de.trion.kafka.outbox;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-@Component
-public class OutboxService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(OutboxService.class);
-
-
-    private final Map<String, String> state = new HashMap<>();
-    private final Map<String, DeferredResult> requests = new HashMap<>();
-
-    private long counter = 1;
-
-
-    public OutboxService() {}
-
-
-    public String bearbeiteVorgang(String vorgangId, String vbId, String data) {
-        if (vorgangId == null)
-            throw new IllegalArgumentException("vorgangId must not be null!");
-
-        // Fehler beim Sichern simulieren
-        Random r = new Random();
-        int i = r.nextInt(10);
-        if (i == 0)
-            throw new RuntimeException("FEHLER!!!!!!");
-
-        String result = vorgangId + "|vbId=" + vbId + "|" + counter++ + ", rand=" + i + ": " + data;
-
-        if (state.containsKey(vorgangId))
-            LOG.info("Bearbeite Vorgang {}: alt={}, neu={}", vorgangId, state.get(vorgangId), data);
-        else
-            LOG.info("Bearbeite Vorgang {}: neu={}", vorgangId, data);
-
-        process(vorgangId, result);
-        return result;
-    }
-
-    public synchronized void process(String vorgangId, DeferredResult result) {
-        String data = state.get(vorgangId);
-        if (data != null) {
-            result.setResult(ResponseEntity.ok(data));
-        }
-        else {
-            requests.put(vorgangId, result);
-        }
-    }
-
-    private synchronized void process(String vorgangId, String result) {
-        state.put(vorgangId, result);
-        DeferredResult request = requests.get(vorgangId);
-        if (request != null)
-          request.setResult(ResponseEntity.ok(result));
-    }
-}