package de.trion.kafka.outbox;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.lvm.tx.Event;
-import de.lvm.tx.Command;
-import de.lvm.tx.Command.Action;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
-import javax.swing.*;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
-import static de.lvm.tx.Event.Type.*;
@Component
public class OutboxConsumer implements ApplicationRunner, Runnable {
{
ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Long, String> record : records) {
- byte code = record.headers().lastHeader("messageType").value()[0];
- Action action = Action.from(code);
-
- if (action == null)
- {
- LOG.debug("Ignoring unknown action {} for {}", code, record.value());
- continue;
- }
-
- switch(action) {
- case SAVE_DLZ:
- dlzSaveReceived(toCommand(record.value()));
- continue;
- default:
- LOG.debug("Ignoring message {}", record.value());
- }
- byte[] bytes = record.headers().lastHeader("messageType").value();
- String type = new String(bytes, StandardCharsets.UTF_8);
-
- if (type.endsWith("DlzAction")) {
- dlzSaveReceived(toCommand(record.value()));
- continue;
- }
-
LOG.debug("Ignoring command {}", record.value());
}
}
}
}
- public Command toCommand(String message) throws IOException {
- Command command = mapper.readValue(message, Command.class);
- LOG.info("{}: {}", command.getAction(), command.getVorgangId());
- return command;
- }
-
- public void dlzSaveReceived(Command command) throws InterruptedException {
- try
- {
- String result =
- service.bearbeiteVorgang(
- command.getVorgangId(),
- command.getVbId(),
- command.getData());
- reply(command, result);
- }
- catch (Exception e) {
- LOG.error("Exception during processing!", e);
- }
- }
-
- public void reply(Command command, String message) {
- String vorgangId = command.getVorgangId();
- String vbId = command.getVbId();
- Event event = new Event(DLZ_SAVED, vorgangId, vbId);
- event.getZustand().put(Event.DLZ, message);
- sender.send(event);
- }
-
@Override
public void run(ApplicationArguments args) {