echo "Resultate für klaus von adder-2"
http :8092/results/klaus | jq .[].sum | uniq
-docker-compose stop adder-1
-until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
-until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done
-
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
-
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose kill -s 9 adder-2
-docker-compose start adder-1
docker-compose kill -s 9 peter klaus
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
-
-echo "Resultate für adder-1"
-http -v --pretty none -S :8091/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-sleep 5
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
package de.juplo.kafka;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
ApplicationRecordHandler recordHandler,
AdderResults adderResults,
StateRepository stateRepository,
- Consumer<String, String> consumer,
ApplicationProperties properties)
{
return new ApplicationRebalanceListener(
recordHandler,
adderResults,
stateRepository,
- properties.getClientId(),
- consumer);
+ properties.getClientId());
}
@Bean
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
private final AdderResults adderResults;
private final StateRepository stateRepository;
private final String id;
- private final Consumer consumer;
private final Set<Integer> partitions = new HashSet<>();
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- log.info("{} - Commiting offsets for all previously assigned partitions", id);
- try
- {
- consumer.commitSync();
- }
- catch (Exception e)
- {
- log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
- }
-
partitions.forEach(tp ->
{
Integer partition = tp.partition();