4460432254379ca6ca0a420db8e4d795f03aa4d7
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
9 import org.apache.kafka.clients.consumer.ConsumerRecord;
10 import org.apache.kafka.clients.consumer.ConsumerRecords;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.common.TopicPartition;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15
16 import java.time.Duration;
17 import java.time.ZoneId;
18 import java.util.*;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReadWriteLock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import java.util.concurrent.locks.ReentrantReadWriteLock;
26
27
28 @Slf4j
29 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
30 {
31   private final ExecutorService executorService;
32   private final Consumer<String, MessageTo> consumer;
33   private final Producer<String, MessageTo> producer;
34   private final String topic;
35   private final ZoneId zoneId;
36   // private final long[] offsets; Erst mal immer alles neu einlesen
37   private final boolean[] isShardOwned;
38   private final Map<UUID, ChatRoom>[] chatRoomMaps;
39   private final ReadWriteLock lock = new ReentrantReadWriteLock();
40
41   private boolean running;
42
43
44   public KafkaChatHomeService(
45     ExecutorService executorService,
46     Consumer<String, MessageTo> consumer,
47     Producer<String, MessageTo> producer,
48     String topic,
49     ZoneId zoneId,
50     int numShards)
51   {
52     log.debug("Creating KafkaChatHomeService");
53     this.executorService = executorService;
54     this.consumer = consumer;
55     this.producer = producer;
56     this.topic = topic;
57     this.zoneId = zoneId;
58     // this.offsets = new long[numShards];
59     // for (int i=0; i< numShards; i++)
60     // {
61     //   this.offsets[i] = 0l;
62     // }
63     this.isShardOwned = new boolean[numShards];
64     this.chatRoomMaps = new Map[numShards];
65   }
66
67
68   @Override
69   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
70   {
71     try
72     {
73       lock.writeLock().lock();
74
75       consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
76       {
77         if (!topicPartition.topic().equals(topic))
78         {
79           log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
80           return;
81         }
82
83         int partition = topicPartition.partition();
84         long unseenOffset = 0; // offsets[partition];
85
86         log.info(
87             "Loading messages from partition {}: start-offset={} -> current-offset={}",
88             partition,
89             unseenOffset,
90             currentOffset);
91
92         // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
93         consumer.seek(topicPartition, unseenOffset);
94       });
95
96       consumer.resume(partitions);
97     }
98     finally
99     {
100       lock.writeLock().unlock();
101     }
102   }
103
104   @Override
105   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
106   {
107     partitions.forEach(topicPartition ->
108     {
109       if (!topicPartition.topic().equals(topic))
110       {
111         log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
112         return;
113       }
114
115       int partition = topicPartition.partition();
116       // long unseenOffset = offsets[partition]; TODO: Offset merken...?
117     });
118     log.info("Revoked partitions: {}", partitions);
119   }
120
121   @Override
122   public void onPartitionsLost(Collection<TopicPartition> partitions)
123   {
124     // TODO: Muss auf den Verlust anders reagiert werden?
125     onPartitionsRevoked(partitions);
126   }
127
128   @Override
129   public void run()
130   {
131     consumer.subscribe(List.of(topic));
132
133     running = true;
134
135     try
136     {
137       while (running)
138       {
139         ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
140         log.info("Fetched {} messages", records.count());
141
142         for (ConsumerRecord<String, MessageTo> record : records)
143         {
144
145         }
146       }
147     }
148   }
149
150   @Override
151   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
152   {
153     if (lock.readLock().tryLock())
154     {
155       try
156       {
157         return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
158       }
159       finally
160       {
161         lock.readLock().unlock();
162       }
163     }
164     else
165     {
166       throw new ShardNotOwnedException(shard);
167     }
168   }
169
170   @Override
171   public Flux<ChatRoom> getChatRooms(int shard)
172   {
173     if (lock.readLock().tryLock())
174     {
175       try
176       {
177         return Flux.fromStream(chatRoomMaps[shard].values().stream());
178       }
179       finally
180       {
181         lock.readLock().unlock();
182       }
183     }
184     else
185     {
186       throw new ShardNotOwnedException(shard);
187     }
188   }
189 }