7bb6d4f594a30c86c97a8fa794fe4b430043c57f
[demos/kafka/outbox] / src / main / java / de / trion / kafka / outbox / OutboxConsumer.java
1 package de.trion.kafka.outbox;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import de.lvm.tx.Event;
5 import de.lvm.tx.Command;
6 import de.lvm.tx.Command.Action;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 import org.apache.kafka.common.errors.WakeupException;
11 import org.apache.kafka.common.serialization.LongDeserializer;
12 import org.apache.kafka.common.serialization.StringDeserializer;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.springframework.boot.ApplicationArguments;
16 import org.springframework.boot.ApplicationRunner;
17 import org.springframework.stereotype.Component;
18
19 import javax.annotation.PreDestroy;
20 import javax.swing.*;
21 import java.io.IOException;
22 import java.nio.charset.StandardCharsets;
23 import java.time.Duration;
24 import java.util.Arrays;
25 import java.util.Properties;
26
27 import static de.lvm.tx.Event.Type.*;
28
29 @Component
30 public class OutboxConsumer implements ApplicationRunner, Runnable {
31
32     private final static Logger LOG = LoggerFactory.getLogger(OutboxConsumer.class);
33
34     private final OutboxService service;
35     private final OutboxProducer sender;
36     private final ObjectMapper mapper;
37     private final String topic;
38     private final KafkaConsumer<Long, String> consumer;
39     private final Thread thread;
40
41     private long internalState = 1;
42
43
44     public OutboxConsumer(
45             OutboxService service,
46             OutboxProducer sender,
47             ObjectMapper mapper,
48             String bootstrapServers,
49             String consumerGroup,
50             String topic) {
51
52         this.service = service;
53         this.sender = sender;
54         this.mapper = mapper;
55         this.topic = topic;
56
57         Properties props = new Properties();
58         props.put("bootstrap.servers", bootstrapServers);
59         props.put("group.id", consumerGroup);
60         props.put("auto.commit.interval.ms", 15000);
61         props.put("metadata.max.age.ms", 1000);
62         props.put("key.deserializer", LongDeserializer.class.getName());
63         props.put("value.deserializer", StringDeserializer.class.getName());
64         consumer = new KafkaConsumer<>(props);
65
66         thread = new Thread(this);
67     }
68
69
70     @Override
71     public void run() {
72         try
73         {
74             LOG.info("Subscribing to topic " + topic);
75             consumer.subscribe(Arrays.asList(topic));
76
77             while (true)
78             {
79                 ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
80                 for (ConsumerRecord<Long, String> record : records) {
81                     byte code = record.headers().lastHeader("messageType").value()[0];
82                     Action action = Action.from(code);
83
84                     if (action == null)
85                     {
86                         LOG.debug("Ignoring unknown action {} for {}", code, record.value());
87                         continue;
88                     }
89
90                     switch(action) {
91                         case SAVE_DLZ:
92                             dlzSaveReceived(toCommand(record.value()));
93                             continue;
94                         default:
95                             LOG.debug("Ignoring message {}", record.value());
96                     }
97                     byte[] bytes = record.headers().lastHeader("messageType").value();
98                     String type = new String(bytes, StandardCharsets.UTF_8);
99
100                     if (type.endsWith("DlzAction")) {
101                         dlzSaveReceived(toCommand(record.value()));
102                         continue;
103                     }
104
105                     LOG.debug("Ignoring command {}", record.value());
106                 }
107             }
108         }
109         catch (WakeupException e) {}
110         catch (Exception e) {
111             LOG.error("Unexpected exception!", e);
112         }
113         finally
114         {
115             LOG.info("Closing the KafkaConsumer...");
116             try {
117                 consumer.close(Duration.ofSeconds(5));
118                 LOG.debug("Successfully closed the KafkaConsumer");
119             }
120             catch (Exception e) {
121                 LOG.warn("Exception while closing the KafkaConsumer!", e);
122             }
123         }
124     }
125
126     public Command toCommand(String message) throws IOException {
127         Command command = mapper.readValue(message, Command.class);
128         LOG.info("{}: {}", command.getAction(), command.getVorgangId());
129         return command;
130     }
131
132     public void dlzSaveReceived(Command command) throws InterruptedException {
133         try
134         {
135             String result =
136                     service.bearbeiteVorgang(
137                             command.getVorgangId(),
138                             command.getVbId(),
139                             command.getData());
140             reply(command, result);
141         }
142         catch (Exception e) {
143             LOG.error("Exception during processing!", e);
144         }
145     }
146
147     public void reply(Command command, String message) {
148         String vorgangId = command.getVorgangId();
149         String vbId = command.getVbId();
150         Event event = new Event(DLZ_SAVED, vorgangId, vbId);
151         event.getZustand().put(Event.DLZ, message);
152         sender.send(event);
153     }
154
155
156     @Override
157     public void run(ApplicationArguments args) {
158         thread.start();
159         try {
160             thread.join();
161             LOG.info("Successfully joined the consumer-thread");
162         }
163         catch (InterruptedException e) {
164             LOG.info("Main-thread was interrupted while joining the consumer-thread");
165         }
166     }
167
168     @PreDestroy
169     public void stop()
170     {
171         LOG.info("Stopping the KafkaConsumer...");
172         consumer.wakeup();
173     }
174 }