WIP - Ein Versuch (vielleicht Unsinn)
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
11 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.util.UUID;
20
21 import static java.nio.file.StandardOpenOption.CREATE;
22 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
23
24
25 @RequiredArgsConstructor
26 @Slf4j
27 public class FilesStorageStrategy implements StorageStrategy
28 {
29   public static final String CHATROOMS_FILENAME = "chatrooms.json";
30
31
32   private final Path storagePath;
33   private final ShardingStrategy shardingStrategy;
34   private final ObjectMapper mapper;
35
36
37   @Override
38   public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
39   {
40     Path path = chatroomsPath();
41     log.info("Writing chatrooms to {}", path);
42     try
43     {
44       Files.createDirectories(storagePath);
45
46       JsonGenerator generator =
47           mapper
48               .getFactory()
49               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
50
51       chatRoomInfoFlux
52           .log()
53           .doFirst(() ->
54           {
55             try
56             {
57               generator.useDefaultPrettyPrinter();
58               generator.writeStartArray();
59             }
60             catch (IOException e)
61             {
62               throw new RuntimeException(e);
63             }
64           })
65           .doOnNext(chatRoomInfo ->
66           {
67             try
68             {
69               ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
70               generator.writeObject(chatRoomInfoTo);
71             }
72             catch (IOException e)
73             {
74               throw new RuntimeException(e);
75             }
76           })
77           .doOnTerminate(() ->
78           {
79             try
80             {
81               generator.writeEndArray();
82               generator.close();
83             }
84             catch (IOException e)
85             {
86               throw new RuntimeException(e);
87             }
88           });
89     }
90     catch (IOException e)
91     {
92       throw new RuntimeException(e);
93     }
94   }
95
96   @Override
97   public Flux<ChatRoomInfo> readChatRoomInfo()
98   {
99     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
100     return Flux
101         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
102         .log()
103         .map(chatRoomInfoTo ->
104         {
105           UUID chatRoomId = chatRoomInfoTo.getId();
106           int shard = shardingStrategy.selectShard(chatRoomId);
107
108           log.info(
109               "{} - old shard: {}, new shard:  {}",
110               chatRoomId,
111               chatRoomInfoTo.getShard(),
112               shard);
113
114           return new ChatRoomInfo(
115               chatRoomId,
116               chatRoomInfoTo.getName(),
117               shard);
118         });
119   }
120
121   @Override
122   public void writeChatRoomData(
123       UUID chatRoomId,
124       Flux<Message> messageFlux)
125   {
126     Path path = chatroomPath(chatRoomId);
127     log.info("Writing messages for {} to {}", chatRoomId, path);
128     try
129     {
130       Files.createDirectories(storagePath);
131
132       JsonGenerator generator =
133           mapper
134               .getFactory()
135               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
136
137       messageFlux
138           .log()
139           .doFirst(() ->
140           {
141             try
142             {
143               generator.useDefaultPrettyPrinter();
144               generator.writeStartArray();
145             }
146             catch (IOException e)
147             {
148               throw new RuntimeException(e);
149             }
150           })
151           .doOnTerminate(() ->
152           {
153             try
154             {
155               generator.writeEndArray();
156               generator.close();
157             }
158             catch (IOException e)
159             {
160               throw new RuntimeException(e);
161             }
162           })
163           .subscribe(message ->
164           {
165             try
166             {
167               MessageTo messageTo = MessageTo.from(message);
168               generator.writeObject(messageTo);
169             }
170             catch (IOException e)
171             {
172               throw new RuntimeException(e);
173             }
174           });
175     }
176     catch (IOException e)
177     {
178       throw new RuntimeException(e);
179     }
180   }
181
182   @Override
183   public Flux<Message> readChatRoomData(UUID chatRoomId)
184   {
185     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
186     return Flux
187         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
188         .log()
189         .map(MessageTo::toMessage);
190   }
191
192   Path chatroomsPath()
193   {
194     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
195   }
196
197   Path chatroomPath(UUID id)
198   {
199     return storagePath.resolve(Path.of(id.toString() + ".json"));
200   }
201 }