<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
- <dependency>
- <groupId>com.h2database</groupId>
- <artifactId>h2</artifactId>
- <scope>runtime</scope>
- </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
-
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
+
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>runtime</scope>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.web.servlet.config.annotation.CorsRegistry;
-import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
public class Application {
private final static Logger LOG = LoggerFactory.getLogger(Application.class);
- @Autowired
- ApplicationProperties properties;
-
-
- @Bean
- public String bootstrapServers() { return properties.bootstrapServers; }
-
- @Bean
- public String topic() {
- return properties.topic;
- }
-
- @Bean
- public String consumerGroup() {
- return properties.consumerGroup;
- }
-
- @Bean
- public WebMvcConfigurer corsConfigurer() {
- return new WebMvcConfigurer() {
- @Override
- public void addCorsMappings(CorsRegistry registry) {
- registry
- .addMapping("/**")
- .allowedOrigins("http://localhost:4200");
- }
- };
- }
-
-
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
+++ /dev/null
-package de.trion.kafka.outbox;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-@ConfigurationProperties("outbox.polling")
-public class ApplicationProperties {
- public String bootstrapServers = "localhost:9092";
- public String topic = "outbox-polling";
- public String consumerGroup = "polling";
-
-
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
- public void setBootstrapServers(String bootstrapServers) {
- this.bootstrapServers = bootstrapServers;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-}
+++ /dev/null
-package de.trion.kafka.outbox;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
-
-
-@Component
-public class OutboxConsumer implements ApplicationRunner, Runnable {
-
- private final static Logger LOG = LoggerFactory.getLogger(OutboxConsumer.class);
-
- private final OutboxService service;
- private final OutboxProducer sender;
- private final ObjectMapper mapper;
- private final String topic;
- private final KafkaConsumer<Long, String> consumer;
- private final Thread thread;
-
- private long internalState = 1;
-
-
- public OutboxConsumer(
- OutboxService service,
- OutboxProducer sender,
- ObjectMapper mapper,
- String bootstrapServers,
- String consumerGroup,
- String topic) {
-
- this.service = service;
- this.sender = sender;
- this.mapper = mapper;
- this.topic = topic;
-
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServers);
- props.put("group.id", consumerGroup);
- props.put("auto.commit.interval.ms", 15000);
- props.put("metadata.max.age.ms", 1000);
- props.put("key.deserializer", LongDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
- consumer = new KafkaConsumer<>(props);
-
- thread = new Thread(this);
- }
-
-
- @Override
- public void run() {
- try
- {
- LOG.info("Subscribing to topic " + topic);
- consumer.subscribe(Arrays.asList(topic));
-
- while (true)
- {
- ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<Long, String> record : records) {
- LOG.debug("Ignoring command {}", record.value());
- }
- }
- }
- catch (WakeupException e) {}
- catch (Exception e) {
- LOG.error("Unexpected exception!", e);
- }
- finally
- {
- LOG.info("Closing the KafkaConsumer...");
- try {
- consumer.close(Duration.ofSeconds(5));
- LOG.debug("Successfully closed the KafkaConsumer");
- }
- catch (Exception e) {
- LOG.warn("Exception while closing the KafkaConsumer!", e);
- }
- }
- }
-
-
- @Override
- public void run(ApplicationArguments args) {
- thread.start();
- try {
- thread.join();
- LOG.info("Successfully joined the consumer-thread");
- }
- catch (InterruptedException e) {
- LOG.info("Main-thread was interrupted while joining the consumer-thread");
- }
- }
-
- @PreDestroy
- public void stop()
- {
- LOG.info("Stopping the KafkaConsumer...");
- consumer.wakeup();
- }
-}
+++ /dev/null
-package de.trion.kafka.outbox;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-@Component
-public class OutboxProducer {
-
- private final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
-
- private final ObjectMapper mapper;
- private final String topic;
- private final KafkaProducer<String, String> producer;
-
-
- public OutboxProducer(ObjectMapper mapper, String bootstrapServers, String topic) {
- this.mapper = mapper;
- this.topic = topic;
-
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServers);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
- producer = new KafkaProducer<>(props);
- }
-
-
- public void send(UserEvent event) {
- try {
- String json = mapper.writeValueAsString(event);
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.user, json);
- producer.send(record, (metadata, e) -> {
- if (e != null) {
- LOG.error("Could not send event {}!", json, e);
- }
- else {
- LOG.debug(
- "{}: send event {} with offset {} to partition {}",
- event.user,
- event.id,
- metadata.offset(),
- metadata.partition());
- }
- });
- } catch (Exception e) {
- throw new RuntimeException("Fehler beim Senden des Events " + event.id, e);
- }
- }
-
-
- @PreDestroy
- public void stop(){
- LOG.info("Closing the KafkaProducer...");
- try {
- producer.close(5, TimeUnit.SECONDS);
- LOG.debug("Successfully closed the KafkaProducer");
- }
- catch (Exception e) {
- LOG.warn("Exception while closing the KafkaProducer!", e);
- }
- }
-}
+++ /dev/null
-package de.trion.kafka.outbox;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-@Component
-public class OutboxService {
-
- private static final Logger LOG = LoggerFactory.getLogger(OutboxService.class);
-
-
- private final Map<String, String> state = new HashMap<>();
- private final Map<String, DeferredResult> requests = new HashMap<>();
-
- private long counter = 1;
-
-
- public OutboxService() {}
-
-
- public String bearbeiteVorgang(String vorgangId, String vbId, String data) {
- if (vorgangId == null)
- throw new IllegalArgumentException("vorgangId must not be null!");
-
- // Fehler beim Sichern simulieren
- Random r = new Random();
- int i = r.nextInt(10);
- if (i == 0)
- throw new RuntimeException("FEHLER!!!!!!");
-
- String result = vorgangId + "|vbId=" + vbId + "|" + counter++ + ", rand=" + i + ": " + data;
-
- if (state.containsKey(vorgangId))
- LOG.info("Bearbeite Vorgang {}: alt={}, neu={}", vorgangId, state.get(vorgangId), data);
- else
- LOG.info("Bearbeite Vorgang {}: neu={}", vorgangId, data);
-
- process(vorgangId, result);
- return result;
- }
-
- public synchronized void process(String vorgangId, DeferredResult result) {
- String data = state.get(vorgangId);
- if (data != null) {
- result.setResult(ResponseEntity.ok(data));
- }
- else {
- requests.put(vorgangId, result);
- }
- }
-
- private synchronized void process(String vorgangId, String result) {
- state.put(vorgangId, result);
- DeferredResult request = requests.get(vorgangId);
- if (request != null)
- request.setResult(ResponseEntity.ok(result));
- }
-}