WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.util.*;
13
14
15 @Slf4j
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
17 {
18   private final Consumer<String, MessageTo> consumer;
19   private final String topic;
20   // private final long[] offsets; Erst mal immer alles neu einlesen
21   private final Map<UUID, ChatRoom>[] chatrooms;
22
23
24   public KafkaChatHomeService(
25     Consumer<String, MessageTo> consumer,
26     String topic,
27     int numShards)
28   {
29     log.debug("Creating KafkaChatHomeService");
30     this.consumer = consumer;
31     this.topic = topic;
32     // this.offsets = new long[numShards];
33     // for (int i=0; i< numShards; i++)
34     // {
35     //   this.offsets[i] = 0l;
36     // }
37     this.chatrooms = new Map[numShards];
38   }
39
40
41   @Override
42   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
43   {
44     consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
45     {
46       if (!tp.topic().equals(topic))
47       {
48         log.warn("Ignoring partition from unwanted topic: {}", tp);
49         return;
50       }
51
52       int partition = tp.partition();
53       long unseenOffset = 0; // offsets[partition];
54
55       log.info(
56           "Loading messages from partition {}: start-offset={} -> current-offset={}",
57           partition,
58           unseenOffset,
59           currentOffset);
60
61       consumer.seek(tp, unseenOffset);
62       chatrooms[partition]
63           .values()
64           .stream()
65       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
66     });
67   }
68
69   @Override
70   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
71   {
72     partitions.forEach(tp ->
73     {
74       if (!tp.topic().equals(topic))
75       {
76         log.warn("Ignoring partition from unwanted topic: {}", tp);
77         return;
78       }
79
80       int partition = tp.partition();
81       long unseenOffset = offsets[partition];
82
83       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
84     });
85     log.info("Revoked partitions: {}", partitions);
86   }
87
88   @Override
89   public void onPartitionsLost(Collection<TopicPartition> partitions)
90   {
91     log.info("Revoked partitions: {}", partitions);
92   }
93
94   private void foo()
95   {
96     Set<Integer> owned = Arrays
97       .stream(ownedShards)
98       .collect(
99         () -> new HashSet<>(),
100         (set, i) -> set.add(i),
101         (a, b) -> a.addAll(b));
102     for (int shard = 0; shard < numShards; shard++)
103     {
104       chatrooms[shard] = owned.contains(shard)
105         ? new HashMap<>()
106         : null;
107     }
108     chatroomFlux
109       .filter(chatRoom ->
110       {
111         if (owned.contains(chatRoom.getShard()))
112         {
113           return true;
114         }
115         else
116         {
117           log.info("Ignoring not owned chat-room {}", chatRoom);
118           return false;
119         }
120       })
121       .toStream()
122       .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
123   }
124
125   @Override
126   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
127   {
128     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
129     return Mono.just(chatRoom);
130   }
131
132   @Override
133   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
134   {
135     return Mono.justOrEmpty(chatrooms[shard].get(id));
136   }
137
138   @Override
139   public Flux<ChatRoom> getChatRooms(int shard)
140   {
141     return Flux.fromStream(chatrooms[shard].values().stream());
142   }
143 }