TMP:Holzweg so, Refaktorisierung nötig
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / files / FilesStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.storage.files;
2
3 import com.fasterxml.jackson.core.JsonGenerator;
4 import com.fasterxml.jackson.databind.JavaType;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
7 import de.juplo.kafka.chat.backend.api.MessageTo;
8 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
9 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
10 import de.juplo.kafka.chat.backend.domain.ChatRoom;
11 import de.juplo.kafka.chat.backend.domain.Message;
12 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
13 import lombok.RequiredArgsConstructor;
14 import lombok.extern.slf4j.Slf4j;
15 import reactor.core.publisher.Flux;
16
17 import java.io.IOException;
18 import java.nio.file.Files;
19 import java.nio.file.Path;
20 import java.time.Clock;
21 import java.util.UUID;
22
23 import static java.nio.file.StandardOpenOption.CREATE;
24 import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
25
26
27 @RequiredArgsConstructor
28 @Slf4j
29 public class FilesStorageStrategy implements StorageStrategy
30 {
31   public static final String CHATROOMS_FILENAME = "chatrooms.json";
32
33
34   private final Path storagePath;
35   private final Clock clock;
36   private final int bufferSize;
37   private final ShardingStrategy shardingStrategy;
38   private final ChatRoomServiceFactory factory;
39   private final ObjectMapper mapper;
40
41
42   @Override
43   public void write(Flux<ChatRoomInfo> chatroomFlux)
44   {
45     Path path = chatroomsPath();
46     log.info("Writing chatrooms to {}", path);
47     try
48     {
49       Files.createDirectories(storagePath);
50
51       JsonGenerator generator =
52           mapper
53               .getFactory()
54               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
55
56       chatroomFlux
57           .log()
58           .doFirst(() ->
59           {
60             try
61             {
62               generator.useDefaultPrettyPrinter();
63               generator.writeStartArray();
64             }
65             catch (IOException e)
66             {
67               throw new RuntimeException(e);
68             }
69           })
70           .doOnTerminate(() ->
71           {
72             try
73             {
74               generator.writeEndArray();
75               generator.close();
76             }
77             catch (IOException e)
78             {
79               throw new RuntimeException(e);
80             }
81           })
82           .subscribe(chatroom ->
83           {
84             try
85             {
86               ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
87               generator.writeObject(infoTo);
88               writeMessages(infoTo, chatroom.getMessages());
89             }
90             catch (IOException e)
91             {
92               throw new RuntimeException(e);
93             }
94           });
95     }
96     catch (IOException e)
97     {
98       throw new RuntimeException(e);
99     }
100   }
101
102   @Override
103   public Flux<ChatRoom> read()
104   {
105     JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
106     return Flux
107         .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
108         .log()
109         .map(infoTo ->
110         {
111           UUID chatRoomId = infoTo.getId();
112           int shard = shardingStrategy.selectShard(chatRoomId);
113           return new ChatRoom(
114               infoTo.getId(),
115               infoTo.getName(),
116               shard,
117               clock,
118               factory.create(readMessages(infoTo)),
119               bufferSize);
120         });
121   }
122
123   public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
124   {
125     Path path = chatroomPath(infoTo);
126     log.info("Writing messages for {} to {}", infoTo, path);
127     try
128     {
129       Files.createDirectories(storagePath);
130
131       JsonGenerator generator =
132           mapper
133               .getFactory()
134               .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
135
136       messageFlux
137           .log()
138           .doFirst(() ->
139           {
140             try
141             {
142               generator.useDefaultPrettyPrinter();
143               generator.writeStartArray();
144             }
145             catch (IOException e)
146             {
147               throw new RuntimeException(e);
148             }
149           })
150           .doOnTerminate(() ->
151           {
152             try
153             {
154               generator.writeEndArray();
155               generator.close();
156             }
157             catch (IOException e)
158             {
159               throw new RuntimeException(e);
160             }
161           })
162           .subscribe(message ->
163           {
164             try
165             {
166               MessageTo messageTo = MessageTo.from(message);
167               generator.writeObject(messageTo);
168             }
169             catch (IOException e)
170             {
171               throw new RuntimeException(e);
172             }
173           });
174     }
175     catch (IOException e)
176     {
177       throw new RuntimeException(e);
178     }
179   }
180
181   public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
182   {
183     JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
184     return Flux
185         .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
186         .log()
187         .map(MessageTo::toMessage);
188   }
189
190   Path chatroomsPath()
191   {
192     return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
193   }
194
195   Path chatroomPath(ChatRoomInfoTo infoTo)
196   {
197     return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));
198   }
199 }