2e3b42f6f8693339481c8f1f8bbfaf5ef5223433
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.util.*;
13
14
15 @Slf4j
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
17 {
18   private final Consumer<String, MessageTo> consumer;
19   private final String topic;
20   private final long[] offsets;
21   private final MessageHandler[] handlers;
22   private final Map<UUID, ChatRoom>[] chatrooms;
23
24
25   public KafkaChatHomeService(
26     Consumer<String, MessageTo> consumer,
27     String topic,
28     int numShards)
29   {
30     log.debug("Creating KafkaChatHomeService");
31     this.consumer = consumer;
32     this.topic = topic;
33     this.offsets = new long[numShards];
34     this.handlers = new MessageHandler[numShards];
35     for (int i=0; i< numShards; i++)
36     {
37       this.offsets[i] = 0l;
38       this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i));
39     }
40     this.chatrooms = new Map[numShards];
41   }
42
43
44   @Override
45   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
46   {
47     consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
48     {
49       if (!tp.topic().equals(topic))
50       {
51         log.warn("Ignoring partition from unwanted topic: {}", tp);
52         return;
53       }
54
55       int partition = tp.partition();
56       long unseenOffset = offsets[partition];
57
58       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
59       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
60     });
61   }
62
63   @Override
64   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
65   {
66     partitions.forEach(tp ->
67     {
68       if (!tp.topic().equals(topic))
69       {
70         log.warn("Ignoring partition from unwanted topic: {}", tp);
71         return;
72       }
73
74       int partition = tp.partition();
75       long unseenOffset = offsets[partition];
76
77       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
78     });
79     log.info("Revoked partitions: {}", partitions);
80   }
81
82   @Override
83   public void onPartitionsLost(Collection<TopicPartition> partitions)
84   {
85     log.info("Revoked partitions: {}", partitions);
86   }
87
88   private void foo()
89   {
90     Set<Integer> owned = Arrays
91       .stream(ownedShards)
92       .collect(
93         () -> new HashSet<>(),
94         (set, i) -> set.add(i),
95         (a, b) -> a.addAll(b));
96     for (int shard = 0; shard < numShards; shard++)
97     {
98       chatrooms[shard] = owned.contains(shard)
99         ? new HashMap<>()
100         : null;
101     }
102     chatroomFlux
103       .filter(chatRoom ->
104       {
105         if (owned.contains(chatRoom.getShard()))
106         {
107           return true;
108         }
109         else
110         {
111           log.info("Ignoring not owned chat-room {}", chatRoom);
112           return false;
113         }
114       })
115       .toStream()
116       .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
117   }
118
119   @Override
120   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
121   {
122     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
123     return Mono.just(chatRoom);
124   }
125
126   @Override
127   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
128   {
129     return Mono.justOrEmpty(chatrooms[shard].get(id));
130   }
131
132   @Override
133   public Flux<ChatRoom> getChatRooms(int shard)
134   {
135     return Flux.fromStream(chatrooms[shard].values().stream());
136   }
137 }