feat: Introduced config-parameters for the `io.projectreactor`-logging
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.Message;
10 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
11 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
12 import lombok.RequiredArgsConstructor;
13 import lombok.extern.slf4j.Slf4j;
14 import reactor.core.publisher.Flux;
15
16 import java.io.IOException;
17 import java.nio.file.Files;
18 import java.nio.file.Path;
19 import java.util.UUID;
20 import java.util.logging.Level;
21
22 import static java.nio.file.StandardOpenOption.CREATE;
23 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
24
25
26 @RequiredArgsConstructor
27 @Slf4j
28 public class FilesStorageStrategy implements StorageStrategy
29 {
30   public static final String CHATROOMS_FILENAME = "chatrooms.json";
31
32
33   private final Path storagePath;
34   private final ShardingStrategy shardingStrategy;
35   private final ObjectMapper mapper;
36   private final Level loggingLevel;
37   private final boolean showOperatorLine;
38
39
40   @Override
41   public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
42   {
43     Path path = chatroomsPath();
44     log.info("Writing chatrooms to {}", path);
45     try
46     {
47       Files.createDirectories(storagePath);
48
49       JsonGenerator generator =
50           mapper
51               .getFactory()
52               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
53
54       return chatRoomInfoFlux
55           .log(
56               FilesStorageStrategy.class.getSimpleName(),
57               loggingLevel,
58               showOperatorLine)
59           .doFirst(() ->
60           {
61             try
62             {
63               generator.useDefaultPrettyPrinter();
64               generator.writeStartArray();
65             }
66             catch (IOException e)
67             {
68               throw new RuntimeException(e);
69             }
70           })
71           .doOnTerminate(() ->
72           {
73             try
74             {
75               generator.writeEndArray();
76               generator.close();
77             }
78             catch (IOException e)
79             {
80               throw new RuntimeException(e);
81             }
82           })
83           .map(chatRoomInfo ->
84           {
85             try
86             {
87               ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
88               generator.writeObject(chatRoomInfoTo);
89               return chatRoomInfo;
90             }
91             catch (IOException e)
92             {
93               throw new RuntimeException(e);
94             }
95           });
96     }
97     catch (IOException e)
98     {
99       throw new RuntimeException(e);
100     }
101   }
102
103   @Override
104   public Flux<ChatRoomInfo> readChatRoomInfo()
105   {
106     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
107     return Flux
108         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
109         .log(
110             FilesStorageStrategy.class.getSimpleName(),
111             loggingLevel,
112             showOperatorLine)
113         .map(chatRoomInfoTo ->
114         {
115           UUID chatRoomId = chatRoomInfoTo.getId();
116           int shard = shardingStrategy.selectShard(chatRoomId);
117
118           log.info(
119               "{} - old shard: {}, new shard:  {}",
120               chatRoomId,
121               chatRoomInfoTo.getShard(),
122               shard);
123
124           return new ChatRoomInfo(
125               chatRoomId,
126               chatRoomInfoTo.getName(),
127               shard);
128         });
129   }
130
131   @Override
132   public Flux<Message> writeChatRoomData(
133       UUID chatRoomId,
134       Flux<Message> messageFlux)
135   {
136     Path path = chatroomPath(chatRoomId);
137     log.info("Writing messages for {} to {}", chatRoomId, path);
138     try
139     {
140       Files.createDirectories(storagePath);
141
142       JsonGenerator generator =
143           mapper
144               .getFactory()
145               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
146
147       return messageFlux
148           .log(
149               FilesStorageStrategy.class.getSimpleName(),
150               loggingLevel,
151               showOperatorLine)
152           .doFirst(() ->
153           {
154             try
155             {
156               generator.useDefaultPrettyPrinter();
157               generator.writeStartArray();
158             }
159             catch (IOException e)
160             {
161               throw new RuntimeException(e);
162             }
163           })
164           .doOnTerminate(() ->
165           {
166             try
167             {
168               generator.writeEndArray();
169               generator.close();
170             }
171             catch (IOException e)
172             {
173               throw new RuntimeException(e);
174             }
175           })
176           .map(message ->
177           {
178             try
179             {
180               MessageTo messageTo = MessageTo.from(message);
181               generator.writeObject(messageTo);
182               return message;
183             }
184             catch (IOException e)
185             {
186               throw new RuntimeException(e);
187             }
188           });
189     }
190     catch (IOException e)
191     {
192       throw new RuntimeException(e);
193     }
194   }
195
196   @Override
197   public Flux<Message> readChatRoomData(UUID chatRoomId)
198   {
199     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
200     return Flux
201         .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
202         .log(
203             FilesStorageStrategy.class.getSimpleName(),
204             loggingLevel,
205             showOperatorLine)
206         .map(MessageTo::toMessage);
207   }
208
209   Path chatroomsPath()
210   {
211     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
212   }
213
214   Path chatroomPath(UUID id)
215   {
216     return storagePath.resolve(Path.of(id.toString() + ".json"));
217   }
218 }