projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
b10219b
)
WIP:mongodb map vs subscribe - subscribe rausgezogen
author
Kai Moritz
<kai@juplo.de>
Mon, 19 Feb 2024 14:01:58 +0000
(15:01 +0100)
committer
Kai Moritz
<kai@juplo.de>
Tue, 20 Feb 2024 09:28:35 +0000
(10:28 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
index
1eaa88c
..
76debbe
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java
@@
-32,7
+32,9
@@
public class ChatBackendApplication implements WebFluxConfigurer
@PreDestroy
public void onExit()
{
@PreDestroy
public void onExit()
{
- storageStrategy.write(chatHomeService);
+ storageStrategy
+ .write(chatHomeService)
+ .subscribe();
}
public static void main(String[] args)
}
public static void main(String[] args)
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
index
f3efe79
..
acb84e6
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
@@
-137,6
+137,6
@@
public class ChatBackendController
@PostMapping("/store")
public void store()
{
@PostMapping("/store")
public void store()
{
- storageStrategy.write(chatHomeService);
+ storageStrategy.write(chatHomeService)
.subscribe()
;
}
}
}
}
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
index
019db65
..
990d001
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
@@
-16,20
+16,20
@@
public interface StorageStrategy
{
Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
{
Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
- default
void
write(ChatHomeService chatHomeService)
+ default
Flux<ChatRoomInfo>
write(ChatHomeService chatHomeService)
{
{
- write(
+
return
write(
chatHomeService,
this::logSuccessChatHomeService,
this::logFailureChatHomeService);
}
chatHomeService,
this::logSuccessChatHomeService,
this::logFailureChatHomeService);
}
- default
void
write(
+ default
Flux<ChatRoomInfo>
write(
ChatHomeService chatHomeService,
ChatHomeServiceWrittenSuccessCallback successCallback,
ChatHomeServiceWrittenFailureCallback failureCallback)
{
ChatHomeService chatHomeService,
ChatHomeServiceWrittenSuccessCallback successCallback,
ChatHomeServiceWrittenFailureCallback failureCallback)
{
- writeChatRoomInfo(
+
return
writeChatRoomInfo(
chatHomeService
.getChatRoomInfo()
.doOnComplete(() -> successCallback.accept(chatHomeService))
chatHomeService
.getChatRoomInfo()
.doOnComplete(() -> successCallback.accept(chatHomeService))
@@
-40,25
+40,26
@@
public interface StorageStrategy
chatHomeService
.getChatRoomData(chatRoomInfo.getId())
.flatMapMany(chatRoomData -> chatRoomData.getMessages()),
chatHomeService
.getChatRoomData(chatRoomInfo.getId())
.flatMapMany(chatRoomData -> chatRoomData.getMessages()),
+
this::logSuccessChatRoom,
this::logSuccessChatRoom,
- this::logFailureChatRoom)));
+ this::logFailureChatRoom)
.subscribe()
));
}
}
-
void
writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+
Flux<ChatRoomInfo>
writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
Flux<ChatRoomInfo> readChatRoomInfo();
- default
void
writeChatRoomData(
+ default
Flux<Message>
writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux,
ChatRoomWrittenSuccessCallback successCallback,
ChatRoomWrittenFailureCallback failureCallback)
{
UUID chatRoomId,
Flux<Message> messageFlux,
ChatRoomWrittenSuccessCallback successCallback,
ChatRoomWrittenFailureCallback failureCallback)
{
- writeChatRoomData(
+
return
writeChatRoomData(
chatRoomId,
messageFlux
.doOnComplete(() -> successCallback.accept(chatRoomId))
.doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
}
chatRoomId,
messageFlux
.doOnComplete(() -> successCallback.accept(chatRoomId))
.doOnError(throwable -> failureCallback.accept(chatRoomId, throwable)));
}
-
void
writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+
Flux<Message>
writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
Flux<Message> readChatRoomData(UUID chatRoomId);
interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
Flux<Message> readChatRoomData(UUID chatRoomId);
interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
index
7e04a96
..
cdb4f0d
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java
@@
-35,7
+35,7
@@
public class FilesStorageStrategy implements StorageStrategy
@Override
@Override
- public
void
writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public
Flux<ChatRoomInfo>
writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
@@
-48,7
+48,7
@@
public class FilesStorageStrategy implements StorageStrategy
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatRoomInfoFlux
+
return
chatRoomInfoFlux
.log()
.doFirst(() ->
{
.log()
.doFirst(() ->
{
@@
-86,8
+86,7
@@
public class FilesStorageStrategy implements StorageStrategy
{
throw new RuntimeException(e);
}
{
throw new RuntimeException(e);
}
- })
- .subscribe();
+ });
}
catch (IOException e)
{
}
catch (IOException e)
{
@@
-121,7
+120,7
@@
public class FilesStorageStrategy implements StorageStrategy
}
@Override
}
@Override
- public
void
writeChatRoomData(
+ public
Flux<Message>
writeChatRoomData(
UUID chatRoomId,
Flux<Message> messageFlux)
{
UUID chatRoomId,
Flux<Message> messageFlux)
{
@@
-136,7
+135,7
@@
public class FilesStorageStrategy implements StorageStrategy
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- messageFlux
+
return
messageFlux
.log()
.doFirst(() ->
{
.log()
.doFirst(() ->
{
@@
-174,8
+173,7
@@
public class FilesStorageStrategy implements StorageStrategy
{
throw new RuntimeException(e);
}
{
throw new RuntimeException(e);
}
- })
- .subscribe();
+ });
}
catch (IOException e)
{
}
catch (IOException e)
{
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
index
853ee1c
..
47596a2
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
@@
-5,6
+5,8
@@
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
+import java.util.UUID;
+
@AllArgsConstructor
@NoArgsConstructor
@AllArgsConstructor
@NoArgsConstructor
@@
-19,6
+21,13
@@
public class ChatRoomTo
private String id;
private String name;
private String id;
private String name;
+ public ChatRoomInfo toChatRoomInfo()
+ {
+ return new ChatRoomInfo(
+ UUID.fromString(id),
+ name);
+ }
+
public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
index
780d64b
..
1428119
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
@@
-21,12
+21,12
@@
public class MongoDbStorageStrategy implements StorageStrategy
@Override
@Override
- public
void
writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ public
Flux<ChatRoomInfo>
writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
{
- chatRoomInfoFlux
+
return
chatRoomInfoFlux
.map(ChatRoomTo::from)
.map(chatroomTo -> chatRoomRepository.save(chatroomTo))
.map(ChatRoomTo::from)
.map(chatroomTo -> chatRoomRepository.save(chatroomTo))
- .
subscribe(
);
+ .
map(ChatRoomTo::toChatRoomInfo
);
}
@Override
}
@Override
@@
-42,12
+42,12
@@
public class MongoDbStorageStrategy implements StorageStrategy
}
@Override
}
@Override
- public
void
writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ public
Flux<Message>
writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
{
{
- messageFlux
+
return
messageFlux
.map(message -> MessageTo.from(chatRoomId, message))
.map(messageTo -> messageRepository.save(messageTo))
.map(message -> MessageTo.from(chatRoomId, message))
.map(messageTo -> messageRepository.save(messageTo))
- .
subscribe(
);
+ .
map(MessageTo::toMessage
);
}
@Override
}
@Override
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
index
6ca08e2
..
5902742
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageStrategy.java
@@
-13,14
+13,18
@@
import java.util.UUID;
@Slf4j
public class NoStorageStorageStrategy implements StorageStrategy
{
@Slf4j
public class NoStorageStorageStrategy implements StorageStrategy
{
- @Override
- public void write(ChatHomeService chatHomeService)
+ public Flux<ChatRoomInfo> write(ChatHomeService chatHomeService)
{
{
- log.info("Storage is disabled: Not storing {}", chatHomeService);
+ return Flux
+ .<ChatRoomInfo>empty()
+ .doOnComplete(() -> log.info("Storage is disabled: Not storing {}", chatHomeService));
+
}
}
- @Override
- public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+ public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ {
+ return chatRoomInfoFlux;
+ }
@Override
public Flux<ChatRoomInfo> readChatRoomInfo()
@Override
public Flux<ChatRoomInfo> readChatRoomInfo()
@@
-29,7
+33,10
@@
public class NoStorageStorageStrategy implements StorageStrategy
}
@Override
}
@Override
- public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
+ public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ return messageFlux;
+ }
@Override
public Flux<Message> readChatRoomData(UUID chatRoomId)
@Override
public Flux<Message> readChatRoomData(UUID chatRoomId)
diff --git
a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
index
5eaf541
..
41e80ed
100644
(file)
--- a/
src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
+++ b/
src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java
@@
-28,7
+28,9
@@
public abstract class AbstractStorageStrategyIT
protected void stop()
{
protected void stop()
{
- getStorageStrategy().write(chathome);
+ getStorageStrategy()
+ .write(chathome)
+ .subscribe();
}
@Test
}
@Test