912295d657db00fd0be4c38e06db7db8f6216968
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18
19 import java.time.*;
20 import java.util.*;
21 import java.util.concurrent.ExecutorService;
22
23
24 @Slf4j
25 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
26 {
27   private final ExecutorService executorService;
28   private final Consumer<String, MessageTo> consumer;
29   private final Producer<String, MessageTo> producer;
30   private final String topic;
31   private final ZoneId zoneId;
32   private final int numShards;
33   private final boolean[] isShardOwned;
34   private final long[] currentOffset;
35   private final long[] nextOffset;
36   private final Map<UUID, ChatRoom>[] chatRoomMaps;
37   private final KafkaLikeShardingStrategy shardingStrategy;
38
39   private boolean running;
40   private volatile boolean loadInProgress;
41
42
43   public KafkaChatHomeService(
44     ExecutorService executorService,
45     Consumer<String, MessageTo> consumer,
46     Producer<String, MessageTo> producer,
47     String topic,
48     ZoneId zoneId,
49     int numShards)
50   {
51     log.debug("Creating KafkaChatHomeService");
52     this.executorService = executorService;
53     this.consumer = consumer;
54     this.producer = producer;
55     this.topic = topic;
56     this.zoneId = zoneId;
57     this.numShards = numShards;
58     this.isShardOwned = new boolean[numShards];
59     this.currentOffset = new long[numShards];
60     this.nextOffset = new long[numShards];
61     this.chatRoomMaps = new Map[numShards];
62     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
63   }
64
65
66   @Override
67   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
68   {
69     loadInProgress = true;
70
71     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
72     {
73       int partition = topicPartition.partition();
74       isShardOwned[partition] =  true;
75       this.currentOffset[partition] = currentOffset;
76
77       log.info(
78           "Partition assigned: {} - loading messages: next={} -> current={}",
79           partition,
80           nextOffset[partition],
81           currentOffset);
82
83       consumer.seek(topicPartition, nextOffset[partition]);
84     });
85
86     consumer.resume(partitions);
87   }
88
89   @Override
90   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
91   {
92     partitions.forEach(topicPartition ->
93     {
94       int partition = topicPartition.partition();
95       isShardOwned[partition] = false;
96       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
97     });
98   }
99
100   @Override
101   public void onPartitionsLost(Collection<TopicPartition> partitions)
102   {
103     log.warn("Lost partitions: {}, partitions");
104     // TODO: Muss auf den Verlust anders reagiert werden?
105     onPartitionsRevoked(partitions);
106   }
107
108   @Override
109   public void run()
110   {
111     consumer.subscribe(List.of(topic));
112
113     running = true;
114
115     try
116     {
117       while (running)
118       {
119         ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
120         log.info("Fetched {} messages", records.count());
121
122         if (loadInProgress)
123         {
124           for (ConsumerRecord<String, MessageTo> record : records)
125           {
126             UUID chatRoomId = UUID.fromString(record.key());
127             MessageTo messageTo = record.value();
128
129             Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
130
131             Instant instant = Instant.ofEpochSecond(record.timestamp());
132             LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
133
134             Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
135
136             ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
137             KafkaChatRoomService kafkaChatRoomService =
138                 (KafkaChatRoomService) chatRoom.getChatRoomService();
139
140             kafkaChatRoomService.persistMessage(message);
141           }
142         }
143         else
144         {
145           if (!records.isEmpty())
146           {
147             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
148           }
149         }
150       }
151     }
152   }
153
154   Mono<Message> sendMessage(
155       UUID chatRoomId,
156       Message.MessageKey key,
157       LocalDateTime timestamp,
158       String text)
159   {
160     int shard = this.shardingStrategy.selectShard(chatRoomId);
161     TopicPartition tp = new TopicPartition(topic, shard);
162     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
163     return Mono.create(sink ->
164     {
165       ProducerRecord<String, MessageTo> record =
166           new ProducerRecord<>(
167               tp.topic(),
168               tp.partition(),
169               zdt.toEpochSecond(),
170               chatRoomId.toString(),
171               MessageTo.of(key.getUsername(), key.getMessageId(), text));
172
173       producer.send(record, ((metadata, exception) ->
174       {
175         if (metadata != null)
176         {
177           // On successful send
178           Message message = new Message(key, metadata.offset(), timestamp, text);
179           log.info("Successfully send message {}", message);
180           sink.success(message);
181         }
182         else
183         {
184           // On send-failure
185           log.error(
186               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
187               chatRoomId,
188               key,
189               timestamp,
190               text,
191               exception);
192           sink.error(exception);
193         }
194       }));
195     });
196   }
197
198
199   @Override
200   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
201   {
202     if (loadInProgress)
203     {
204       throw new ShardNotOwnedException(shard);
205     }
206     else
207     {
208       return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
209     }
210   }
211
212   @Override
213   public Flux<ChatRoom> getChatRooms(int shard)
214   {
215     if (loadInProgress)
216     {
217       throw new ShardNotOwnedException(shard);
218     }
219     else
220     {
221       return Flux.fromStream(chatRoomMaps[shard].values().stream());
222     }
223   }
224 }