WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
10 import org.apache.kafka.common.TopicPartition;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
13
14 import java.util.*;
15
16
17 @Slf4j
18 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
19 {
20   private final Consumer<String, MessageTo> consumer;
21   private final String topic;
22   private final long[] offsets;
23   private final MessageHandler[] handlers;
24   private final Map<UUID, ChatRoom>[] chatrooms;
25
26
27   public KafkaChatHomeService(
28     Consumer<String, MessageTo> consumer,
29     String topic,
30     int numShards)
31   {
32     log.debug("Creating KafkaChatHomeService");
33     this.consumer = consumer;
34     this.topic = topic;
35     this.offsets = new long[numShards];
36     this.handlers = new MessageHandler[numShards];
37     for (int i=0; i< numShards; i++)
38     {
39       this.offsets[i] = 0l;
40       this.handlers[i] = new NoOpMessageHandler(i);
41     }
42     this.chatrooms = new Map[numShards];
43   }
44
45
46   @Override
47   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
48   {
49     consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
50     {
51       if (!tp.topic().equals(topic))
52       {
53         log.warn("Ignoring unwanted TopicPartition", tp);
54         return;
55       }
56
57       int partition = tp.partition();
58       long unseenOffset = offsets[partition];
59
60       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
61       handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
62     });
63   }
64
65   @Override
66   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
67   {
68     log.info("Revoked partitions: {}", partitions);
69   }
70
71   @Override
72   public void onPartitionsLost(Collection<TopicPartition> partitions)
73   {
74     log.info("Revoked partitions: {}", partitions);
75   }
76
77   private void foo()
78   {
79     Set<Integer> owned = Arrays
80       .stream(ownedShards)
81       .collect(
82         () -> new HashSet<>(),
83         (set, i) -> set.add(i),
84         (a, b) -> a.addAll(b));
85     for (int shard = 0; shard < numShards; shard++)
86     {
87       chatrooms[shard] = owned.contains(shard)
88         ? new HashMap<>()
89         : null;
90     }
91     chatroomFlux
92       .filter(chatRoom ->
93       {
94         if (owned.contains(chatRoom.getShard()))
95         {
96           return true;
97         }
98         else
99         {
100           log.info("Ignoring not owned chat-room {}", chatRoom);
101           return false;
102         }
103       })
104       .toStream()
105       .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
106   }
107
108   @Override
109   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
110   {
111     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
112     return Mono.just(chatRoom);
113   }
114
115   @Override
116   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
117   {
118     return Mono.justOrEmpty(chatrooms[shard].get(id));
119   }
120
121   @Override
122   public Flux<ChatRoom> getChatRooms(int shard)
123   {
124     return Flux.fromStream(chatrooms[shard].values().stream());
125   }
126
127
128   interface MessageHandler
129   {
130     MessageHandler handleMessage(Message message);
131   }
132
133
134   @RequiredArgsConstructor
135   class NoOpMessageHandler implements MessageHandler
136   {
137     private final TopicPartition tp;
138
139     @Override
140     public MessageHandler handleMessage(Message message)
141     {
142       log.warn("Not handling message {} for partition {}", message, tp);
143       return this;
144     }
145   }
146
147   class ChatRoomLoadingMessageHandler implements MessageHandler
148   {
149     private final TopicPartition tp;
150     private final long currentOffset;
151     private final long unseenOffset;
152
153     ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
154     {
155       this.tp = tp;
156       this.currentOffset = currentOffset;
157       this.unseenOffset = unseenOffset;
158
159       consumer.seek(tp, unseenOffset);
160     }
161
162     @Override
163     public MessageHandler handleMessage(Message message)
164     {
165       // todo
166       return this;
167     }
168   }
169
170   @RequiredArgsConstructor
171   class DefaultMessageHandler implements MessageHandler
172   {
173     private final TopicPartition tp;
174
175     @Override
176     public MessageHandler handleMessage(Message message)
177     {
178       chatrooms[tp.partition()].put()
179       return this;
180     }
181   }
182 }