Kai Moritz [Wed, 6 Mar 2024 09:07:53 +0000 (10:07 +0100)]
refactor: Simplified shutdown - channel-tasks were joined multiple times
* `KafkaServicesApplicationRunner` does not have to join the channel-tasks.
* The channel-tasks are already joined by `ChannelTaskExecutor.join()`
automatically, because the method is annotated with `@PreDestroy`.
* Simplified the test-configuration accordingly.
Kai Moritz [Wed, 6 Mar 2024 07:26:03 +0000 (08:26 +0100)]
fix: The shutdown of the application was blocked
* The auto-configured bean `applicationTaskExecutor` must not block, while
it is shutting down, because otherwise, it infinitly waits for the
completion, of the channel-tasks, which are stopped in a _later_ phase
of the "smart" lifecycle.
* The bean is destroyed first, becaus it is is associated with the lowest
lifecyle-phase (``Integer.MAX_VALUE``), which apparently cannot be
overruled by `@DependsOn` (although suggested by the spring-
documentation)
* Joining the channel-tasks was blocking infinitly, because the tasks were
waiting for the kafka-consumers to be closed, what only happens _after_
the joining completes.
* Renamed attributes and method-names according to the class-renames.
* Introduced interface `Channel` and `enum ChannelState`.
* `Data` - and `InfoChannel` maintain a `ChannelState`, instead just a
plain boolean, that only reflects the loading-state.
* The `ChannelTaskRunner` waits, until both channels entered the State
`ChannelState.SHUTTING_DOWN`.
* Renamed and moved `LoadInProgressException`
** Moved exception into implementation-specific package
** Renamed exception to `ChannelNotReadyException`
* Renamed `ConsumerTaskExecutor` into `ChannelTaskExecutor`
* Renamed `ConsumerTaskRunner` into `ChannelTaskRunner`
Kai Moritz [Sun, 3 Mar 2024 09:08:36 +0000 (10:08 +0100)]
test: HandoverIT-POC - Working fix: using `delayElements()`
* Switched from `Flux.flatMap(Mono.delay()..)` to
`Flux.from(..).delayElements()`FIX:delay_vs_delayElements.
* This delays eache element of the `Flux` by the same amount.
* The requests are made, when the according element of the flux is
executed - not when the `Flux` is created, as before.
Kai Moritz [Sat, 2 Mar 2024 17:21:44 +0000 (18:21 +0100)]
test: HandoverIT-POC - Not working fix: using `delay()`
* Switched from `Mono.from(..).delayElement()` to `Mono.delay().then()`.
* This does _not_ solve the problem, that all delays are calculated and
scheduled, when the `Flux` is created.
Kai Moritz [Sun, 3 Mar 2024 09:20:11 +0000 (10:20 +0100)]
WIP:test: HandoverIT-POC - Excuting Flux...
* Droped the waiting for `TestListener` alltogehter.
* The waiting can be droped, because waiting for the `TestWriter`-instances
ensures, that all messages are send (and therefore very likely received)
Kai Moritz [Wed, 28 Feb 2024 10:50:11 +0000 (11:50 +0100)]
fix: Fixed `ConcurrentModificationException` when accessing a chat-room
* If a new chat-room was created, `InfoChannel` only reacted with the
creation of the according `ChatRoomInfo`-instance.
* The creation of the accompanying `ChatRoomData`-instance through
`DataChannel` was posponed until the new chat-room was accessed the
first time.
* That way, `InfoChannel` did not need to know `DataChannel`, so that a
cyclic dependency could be avoided.
* As a downside, this approach was open to a race-condition: if several
accesses to the newly created chat-room happend in parallel, a
`ConcurrentModificationException` was thrown, since the instance of
`ChatRoomData` was created multiple times in parallel.
* To circumvent the locking, that would be necesarry to evade this race
condition, the approach was refactored, so that `InfoChannel` now
explicitly triggers the creation of the `ChatRoomData`-instance.
* To do so without introducing a cyclic dependency, the class
`ChannelMediator` was introduced, so that `InfoChannel` and `DataChannel`
need not to know each other.
Kai Moritz [Wed, 28 Feb 2024 10:14:32 +0000 (11:14 +0100)]
refactor: Introduced `ChannelMediator`
* `InfoChannel` and `DataChannel` must not know each other directly.
* This is necessary, to prevent a cyclic dependency, that would otherwise
be introduced, if `InfoChannel` also has to communicate with
`DataChannel`.
Kai Moritz [Mon, 26 Feb 2024 18:55:19 +0000 (19:55 +0100)]
fix: Errors during shard-publishing should not kill the instance
* `HaproxyShardingPublisherStrategy` has to transform any exception into
a `Mono.error()`.
* `DataChannel.onPartitionsAssigned(..)` has to log and swallow errors
during the propagation of the shard-ownership.
Kai Moritz [Thu, 22 Feb 2024 15:03:08 +0000 (16:03 +0100)]
fix: GREEN - Fixed the restore-mechanism
* The code of a reactive flow _must not_ call blocking functions.
* In order to solve this, the restore-process is triggered explicitly
after the creation of the classes.
Kai Moritz [Tue, 20 Feb 2024 15:12:00 +0000 (16:12 +0100)]
refactor: RED - Refined success/error-handling for restore-operations
* This innocent little change discloses a severe missconception in the
implementation of the storage strategies.
* The call to `Mono.block()`, though not really changing the behaviour
during the restore-process, triggers a sanity-check from
io.projectractor.
Kai Moritz [Tue, 20 Feb 2024 10:28:22 +0000 (11:28 +0100)]
refactor: Refined stream-definition in `StorageStrategy#write`
* Changed the stream-definition to a more natural order.
* As a result, the stored `ChatRoomInfo`-instances do not have to be
handed clumsily to the following stream.
Kai Moritz [Tue, 20 Feb 2024 06:47:22 +0000 (07:47 +0100)]
refactor: Simplified `StorageStrategy`
* Reconfigurable success/error-logging was introduced for
`NoStorageStorageStrategy`.
* But as it turns out, this strategy can simply apply its logging in the
overwritten method, that disables the whole storing-logic.
* Hence, the interface was greatly simplified again, by removing this
ununsed mechanism.
Kai Moritz [Mon, 19 Feb 2024 14:01:58 +0000 (15:01 +0100)]
refactor: Moved extracted the `subscribe()`-call from `StorageStrategy`
* The subscription does no more happen inside the implementations of the
interface `StorageStrategy`.
* Instead, the methods, that are defined in `StorageStrategy` return the
created `Flux`.
* The call to `subscribe()` happens in the code, that uses this methods.
* This faciliates feature refinements concerning the asynchronous handling
of success- and error-cases and so forth.
Kai Moritz [Sun, 18 Feb 2024 19:12:19 +0000 (20:12 +0100)]
fix: Without `@DirtiesContext` the app is not teared down correctly
* `StorageStrategy` depends on Spring to call the method
`ChatBackendApplication#onExit()`, that is annotated with `@PreDestroy`.
* If this method is not called, the strategy is not applied, which leads
to errors in the integration-tests.
* This happens, if all tests are run through Maven, because Spring
recycles the application-context and only tears down the context after
all tests have run.
* The addition of `@DirtiesContext` on those tests forces Spring to tear
down the application completely after each so annotated test.
* Hence, the method is called as expected, which fixes the described
errors.
Kai Moritz [Sat, 3 Feb 2024 16:25:45 +0000 (17:25 +0100)]
fix: `getChatRoomInfo()` thrwos `LoadInProgressException` when loading
- The method `InfoChannel.getChatRoomInfo(UUID)` has to check, if loading
is in process.
- Otherwise, an existing chat-room might erronously not be found, if it is
requested, while `InfoChannel` is loading, because it is not yet loaded.
Kai Moritz [Sun, 24 Sep 2023 19:39:01 +0000 (21:39 +0200)]
fix: The actual position has to be requested from the consumer
* If the last seen offset and the current offset differ, although the
partition did not contain any messages between this offsets, the loading
process got stuck, because the position never advanced.
* Therefore, the actual position, that is compared against the read
end-offset, has to be requested from the consumer.
* Implemented a first simple `ShardingPublisherStrategy`, that uses the
https://www.haproxy.com/documentation/haproxy-runtime-api/[HAProxy Runntime API]
to publish changed ownerships.
* Added configuration-properties `kafka.haproxyRuntimeApi` and
`kafka.haproxyMap` to configure the strategy.
* The interface is used by `DataChannel` to publish the changed ownership
each time, a new partition is assigned to the consumer-group.
* Added a dummy-implementation in `KafkaServicesConfiguration`.
Kai Moritz [Sat, 16 Sep 2023 19:40:45 +0000 (21:40 +0200)]
feat: Introduced events that are send, if a shard is assigned/revoked
* In order to redirect requests to the appropriate instances, each
instance must know the mapping from shard-IDs to instance-URIs.
** For the static `in-memory`-implementation `ShardedChatHomeService`,
this is a static mapping, that can be configured and read on start-up.
** For the dynamic `KafkaChatHomeService`, this mapping has to be
propageted to all instances each time the partition-assignment of the
consumer-group changes.
* Changes for `ShardedChatHomeService`
** Introduced `ChatBackendProperties.shardOwners` (of type `URI[]`).
** Each instance reads the static mapping from shard-ID to instance-URI
on start-up.
* Changes for `KafkaChatHomeService`
** Introduced `KafkaServicesProperties.instanceUri` (of type `URI`).
** Each instance reads its URI on start-up.
** `DataChannel` sends an event for each assigned/removed partition,
when the partition-assignment of the consumer-group changes.
** These events propagete the changed mapping from partition-ID to
instance-URI.
** `InfoChannel` receives these events and updates the dynamic mapping
from partition-ID (aka:shard-ID) to instance-URI.
* The shards, that are owned by an instance, can be queried via `/shards`.