b2ed284221e6fc99bdf829b0518307297fa2a934
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.util.*;
13
14
15 @Slf4j
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
17 {
18   private final Consumer<String, MessageTo> consumer;
19   private final String topic;
20   private final long[] offsets;
21   private final Map<UUID, ChatRoom>[] chatrooms;
22
23
24   public KafkaChatHomeService(
25     Consumer<String, MessageTo> consumer,
26     String topic,
27     int numShards)
28   {
29     log.debug("Creating KafkaChatHomeService");
30     this.consumer = consumer;
31     this.topic = topic;
32     this.offsets = new long[numShards];
33     this.chatrooms = new Map<>[numShards];
34     for (int i=0; i< numShards; i++)
35     {
36       this.offsets[i] = 0l;
37       this.chatrooms[i] = chat
38     }
39     this.chatrooms = new Map[numShards];
40     this.chatrooms = new Map[numShards];
41   }
42
43
44   @Override
45   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
46   {
47     consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
48     {
49       if (!tp.topic().equals(topic))
50       {
51         log.warn("Ignoring partition from unwanted topic: {}", tp);
52         return;
53       }
54
55       int partition = tp.partition();
56       long unseenOffset = offsets[partition];
57
58       log.info(
59           "Loading messages from partition {}: start-offset={} -> current-offset={}",
60           partition,
61           unseenOffset,
62           currentOffset);
63       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
64     });
65   }
66
67   @Override
68   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
69   {
70     partitions.forEach(tp ->
71     {
72       if (!tp.topic().equals(topic))
73       {
74         log.warn("Ignoring partition from unwanted topic: {}", tp);
75         return;
76       }
77
78       int partition = tp.partition();
79       long unseenOffset = offsets[partition];
80
81       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
82     });
83     log.info("Revoked partitions: {}", partitions);
84   }
85
86   @Override
87   public void onPartitionsLost(Collection<TopicPartition> partitions)
88   {
89     log.info("Revoked partitions: {}", partitions);
90   }
91
92   private void foo()
93   {
94     Set<Integer> owned = Arrays
95       .stream(ownedShards)
96       .collect(
97         () -> new HashSet<>(),
98         (set, i) -> set.add(i),
99         (a, b) -> a.addAll(b));
100     for (int shard = 0; shard < numShards; shard++)
101     {
102       chatrooms[shard] = owned.contains(shard)
103         ? new HashMap<>()
104         : null;
105     }
106     chatroomFlux
107       .filter(chatRoom ->
108       {
109         if (owned.contains(chatRoom.getShard()))
110         {
111           return true;
112         }
113         else
114         {
115           log.info("Ignoring not owned chat-room {}", chatRoom);
116           return false;
117         }
118       })
119       .toStream()
120       .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
121   }
122
123   @Override
124   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
125   {
126     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
127     return Mono.just(chatRoom);
128   }
129
130   @Override
131   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
132   {
133     return Mono.justOrEmpty(chatrooms[shard].get(id));
134   }
135
136   @Override
137   public Flux<ChatRoom> getChatRooms(int shard)
138   {
139     return Flux.fromStream(chatrooms[shard].values().stream());
140   }
141 }