WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaTestUtils.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.TopicPartition;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.kafka.core.KafkaTemplate;
10 import org.springframework.kafka.support.SendResult;
11 import reactor.core.publisher.Mono;
12
13 import java.util.List;
14
15
16 @Slf4j
17 public abstract class KafkaTestUtils
18 {
19   public static class KafkaTestConfiguration
20   {
21     @Bean
22     public ShardingPublisherStrategy shardingPublisherStrategy()
23     {
24       return shard -> Mono.just("MOCKED!");
25     }
26
27     @Bean
28     public WorkAssignor dataChannelWorkAssignor(
29         ChatBackendProperties properties,
30         DataChannel dataChannel)
31     {
32       return consumer ->
33       {
34         List<TopicPartition> assignedPartitions =
35             List.of(new TopicPartition(properties.getKafka().getDataChannelTopic(), 2));
36         consumer.assign(assignedPartitions);
37         dataChannel.onPartitionsAssigned(assignedPartitions);
38       };
39     }
40   }
41
42
43   public static void sendAndLoadStoredData(
44       KafkaTemplate<String, String> messageTemplate,
45       String infoTopic,
46       String dataTopic,
47       ChannelTaskRunner channelTaskRunner)
48   {
49     send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
50     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
51     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
52     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
53     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
54
55     channelTaskRunner.executeChannel();
56   }
57
58   private static void send(
59       KafkaTemplate<String, String> kafkaTemplate,
60       String topic,
61       String key,
62       String value,
63       String typeId)
64   {
65     ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
66     record.headers().add("__TypeId__", typeId.getBytes());
67     SendResult<String, String> result = kafkaTemplate.send(record).join();
68     log.info(
69         "Sent {}={} to {}",
70         key,
71         value,
72         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
73   }
74 }