projects
/
demos
/
kafka
/
chat
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
refactor: Moved `ShardingStrategy` into package `persistence` -- ALIGNE
[demos/kafka/chat]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
chat
/
backend
/
persistence
/
storage
/
files
/
FilesStorageStrategy.java
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
index
025e3ae
..
9c79197
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
@@
-5,10
+5,10
@@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@@
-16,7
+16,6
@@
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.time.Clock;
import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
@@
-31,15
+30,12
@@
public class FilesStorageStrategy implements StorageStrategy
private final Path storagePath;
private final Path storagePath;
- private final Clock clock;
- private final int bufferSize;
private final ShardingStrategy shardingStrategy;
private final ShardingStrategy shardingStrategy;
- private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
@Override
private final ObjectMapper mapper;
@Override
- public void write
(Flux<ChatRoom> chatroom
Flux)
+ public void write
ChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfo
Flux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
@@
-52,7
+48,7
@@
public class FilesStorageStrategy implements StorageStrategy
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chat
room
Flux
+ chat
RoomInfo
Flux
.log()
.doFirst(() ->
{
.log()
.doFirst(() ->
{
@@
-78,13
+74,12
@@
public class FilesStorageStrategy implements StorageStrategy
throw new RuntimeException(e);
}
})
throw new RuntimeException(e);
}
})
- .subscribe(chat
room
->
+ .subscribe(chat
RoomInfo
->
{
try
{
{
try
{
- ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
- generator.writeObject(infoTo);
- writeMessages(infoTo, chatroom.getMessages());
+ ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+ generator.writeObject(chatRoomInfoTo);
}
catch (IOException e)
{
}
catch (IOException e)
{
@@
-99,30
+94,37
@@
public class FilesStorageStrategy implements StorageStrategy
}
@Override
}
@Override
- public Flux<ChatRoom
> read
()
+ public Flux<ChatRoom
Info> readChatRoomInfo
()
{
JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
.from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
.log()
{
JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
.from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
.log()
- .map(
i
nfoTo ->
+ .map(
chatRoomI
nfoTo ->
{
{
- UUID chatRoomId =
i
nfoTo.getId();
+ UUID chatRoomId =
chatRoomI
nfoTo.getId();
int shard = shardingStrategy.selectShard(chatRoomId);
int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
- infoTo.getId(),
- infoTo.getName(),
- shard,
- clock,
- factory.create(readMessages(infoTo)),
- bufferSize);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomInfoTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomInfoTo.getName(),
+ shard);
});
}
});
}
- public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
+ @Override
+ public void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux)
{
{
- Path path = chatroomPath(
infoTo
);
- log.info("Writing messages for {} to {}",
infoTo
, path);
+ Path path = chatroomPath(
chatRoomId
);
+ log.info("Writing messages for {} to {}",
chatRoomId
, path);
try
{
Files.createDirectories(storagePath);
try
{
Files.createDirectories(storagePath);
@@
-177,11
+179,12
@@
public class FilesStorageStrategy implements StorageStrategy
}
}
}
}
- public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(
infoTo
), mapper, type))
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(
chatRoomId
), mapper, type))
.log()
.map(MessageTo::toMessage);
}
.log()
.map(MessageTo::toMessage);
}
@@
-191,8
+194,8
@@
public class FilesStorageStrategy implements StorageStrategy
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(
ChatRoomInfoTo infoTo
)
+ Path chatroomPath(
UUID id
)
{
{
- return storagePath.resolve(Path.of(i
nfoTo.getId()
.toString() + ".json"));
+ return storagePath.resolve(Path.of(i
d
.toString() + ".json"));
}
}
}
}