WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.util.*;
13
14
15 @Slf4j
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
17 {
18   private final Consumer<String, MessageTo> consumer;
19   private final String topic;
20   // private final long[] offsets; Erst mal immer alles neu einlesen
21   private final Map<UUID, KafkaChatRoomService>[] kafkaChatRoomServiceMaps;
22   private final Map<UUID, ChatRoom>[] chatRoomMaps;
23
24
25   public KafkaChatHomeService(
26     Consumer<String, MessageTo> consumer,
27     String topic,
28     int numShards)
29   {
30     log.debug("Creating KafkaChatHomeService");
31     this.consumer = consumer;
32     this.topic = topic;
33     // this.offsets = new long[numShards];
34     // for (int i=0; i< numShards; i++)
35     // {
36     //   this.offsets[i] = 0l;
37     // }
38     this.kafkaChatRoomServiceMaps = new Map[numShards];
39     this.chatRoomMaps = new Map[numShards];
40   }
41
42
43   @Override
44   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
45   {
46     consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
47     {
48       if (!tp.topic().equals(topic))
49       {
50         log.warn("Ignoring partition from unwanted topic: {}", tp);
51         return;
52       }
53
54       int partition = tp.partition();
55       kafkaChatRoomServiceMaps[partition] = new HashMap<>(); // TODO: reuse! Nicht immer alles neu laden
56       long unseenOffset = 0; // offsets[partition];
57
58       log.info(
59           "Loading messages from partition {}: start-offset={} -> current-offset={}",
60           partition,
61           unseenOffset,
62           currentOffset);
63
64       consumer.seek(tp, unseenOffset);
65       chatRoomMaps[partition]
66           .values()
67           .stream()
68       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
69     });
70   }
71
72   @Override
73   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
74   {
75     partitions.forEach(tp ->
76     {
77       if (!tp.topic().equals(topic))
78       {
79         log.warn("Ignoring partition from unwanted topic: {}", tp);
80         return;
81       }
82
83       int partition = tp.partition();
84       long unseenOffset = offsets[partition];
85
86       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
87     });
88     log.info("Revoked partitions: {}", partitions);
89   }
90
91   @Override
92   public void onPartitionsLost(Collection<TopicPartition> partitions)
93   {
94     log.info("Revoked partitions: {}", partitions);
95   }
96
97   private void foo()
98   {
99     Set<Integer> owned = Arrays
100       .stream(ownedShards)
101       .collect(
102         () -> new HashSet<>(),
103         (set, i) -> set.add(i),
104         (a, b) -> a.addAll(b));
105     for (int shard = 0; shard < numShards; shard++)
106     {
107       chatRoomMaps[shard] = owned.contains(shard)
108         ? new HashMap<>()
109         : null;
110     }
111     chatroomFlux
112       .filter(chatRoom ->
113       {
114         if (owned.contains(chatRoom.getShard()))
115         {
116           return true;
117         }
118         else
119         {
120           log.info("Ignoring not owned chat-room {}", chatRoom);
121           return false;
122         }
123       })
124       .toStream()
125       .forEach(chatroom -> chatRoomMaps[chatroom.getShard()].put(chatroom.getId(), chatroom));
126   }
127
128   @Override
129   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
130   {
131     chatRoomMaps[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
132     return Mono.just(chatRoom);
133   }
134
135   @Override
136   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
137   {
138     return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
139   }
140
141   @Override
142   public Flux<ChatRoom> getChatRooms(int shard)
143   {
144     return Flux.fromStream(chatRoomMaps[shard].values().stream());
145   }
146 }