a95df543813f3223895b631d2b40e334309c9c1c
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.ZoneId;
14 import java.util.*;
15 import java.util.concurrent.ExecutorService;
16
17
18 @Slf4j
19 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
20 {
21   private final ExecutorService executorService;
22   private final Consumer<String, MessageTo> consumer;
23   private final Producer<String, MessageTo> producer;
24   private final String topic;
25   private final ZoneId zoneId;
26   // private final long[] offsets; Erst mal immer alles neu einlesen
27   private final ChatHomeLoader[] chatHomeLoaders;
28   private final Map<UUID, ChatRoom>[] chatRoomMaps;
29
30
31   public KafkaChatHomeService(
32     ExecutorService executorService,
33     Consumer<String, MessageTo> consumer,
34     Producer<String, MessageTo> producer,
35     String topic,
36     ZoneId zoneId,
37     int numShards)
38   {
39     log.debug("Creating KafkaChatHomeService");
40     this.executorService = executorService;
41     this.consumer = consumer;
42     this.producer = producer;
43     this.topic = topic;
44     this.zoneId = zoneId;
45     // this.offsets = new long[numShards];
46     // for (int i=0; i< numShards; i++)
47     // {
48     //   this.offsets[i] = 0l;
49     // }
50     this.chatHomeLoaders = new ChatHomeLoader[numShards];
51     this.chatRoomMaps = new Map[numShards];
52   }
53
54
55   @Override
56   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
57   {
58     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
59     {
60       if (!topicPartition.topic().equals(topic))
61       {
62         log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
63         return;
64       }
65
66       int partition = topicPartition.partition();
67       long unseenOffset = 0; // offsets[partition];
68
69       log.info(
70           "Loading messages from partition {}: start-offset={} -> current-offset={}",
71           partition,
72           unseenOffset,
73           currentOffset);
74
75       // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
76       consumer.seek(topicPartition, unseenOffset);
77       chatHomeLoaders[partition] = new ChatHomeLoader(
78           producer,
79           currentOffset,
80           zoneId);
81     });
82   }
83
84   @Override
85   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
86   {
87     partitions.forEach(topicPartition ->
88     {
89       if (!topicPartition.topic().equals(topic))
90       {
91         log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
92         return;
93       }
94
95       int partition = topicPartition.partition();
96       // long unseenOffset = offsets[partition]; TODO: Offset merken...?
97     });
98     log.info("Revoked partitions: {}", partitions);
99   }
100
101   @Override
102   public void onPartitionsLost(Collection<TopicPartition> partitions)
103   {
104     // TODO: Muss auf den Verlust anders reagiert werden?
105     onPartitionsRevoked(partitions);
106   }
107
108   @Override
109   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
110   {
111     return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
112   }
113
114   @Override
115   public Flux<ChatRoom> getChatRooms(int shard)
116   {
117     return Flux.fromStream(chatRoomMaps[shard].values().stream());
118   }
119 }