summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
bfddb34)
* Das regelmäßige Speichern im Poll-Interval wird für die Übung nicht
benötigt.
* Damit entfällt auch das Interface
`PollIntervalAwareConsumerRebalanceListener`
* Die Vereinfachung hat eine Anpassung der Tests erfordert: Da in dem
Test, der überprüft, ob die Offsets korrekt committed werde, wenn kein
Fehler vorliegt, gar kein Rebalance auftritt, musste der Consumer
gestoppt werden, damit die Ergebnisse für die Überprüfung sichtbar
werden.
recordHandler,
adderResults,
stateRepository,
recordHandler,
adderResults,
stateRepository,
- properties.getClientId(),
- Clock.systemDefaultZone(),
- properties.getCommitInterval());
+ properties.getClientId());
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import java.time.Clock;
import org.apache.kafka.common.TopicPartition;
import java.time.Clock;
@RequiredArgsConstructor
@Slf4j
@RequiredArgsConstructor
@Slf4j
-public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+public class ApplicationRebalanceListener implements ConsumerRebalanceListener
{
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
private final StateRepository stateRepository;
private final String id;
{
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
private final StateRepository stateRepository;
private final String id;
- private final Clock clock;
- private final Duration commitInterval;
private final Set<Integer> partitions = new HashSet<>();
private final Set<Integer> partitions = new HashSet<>();
- private Instant lastCommit = Instant.EPOCH;
-
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
stateRepository.save(new StateDocument(partition, state, results));
});
}
stateRepository.save(new StateDocument(partition, state, results));
});
}
-
-
- @Override
- public void beforeNextPoll()
- {
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data, last commit: {}", lastCommit);
- partitions.forEach(partition -> stateRepository.save(
- new StateDocument(
- partition,
- recordHandler.getState(partition).getState(),
- adderResults.getState(partition))));
- lastCommit = clock.instant();
- }
- }
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+ private final ConsumerRebalanceListener rebalanceListener;
private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
-
- rebalanceListener.beforeNextPoll();
}
}
catch(WakeupException e)
}
}
catch(WakeupException e)
+++ /dev/null
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-
-
-public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
-{
- default void beforeNextPoll() {}
-}
import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@Autowired
MongoProperties mongoProperties;
@Autowired
@Autowired
MongoProperties mongoProperties;
@Autowired
- PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+ ConsumerRebalanceListener rebalanceListener;
@Autowired
RecordHandler<K, V> recordHandler;
@Autowired
RecordHandler<K, V> recordHandler;
/** Tests methods */
@Test
/** Tests methods */
@Test
- void commitsCurrentOffsetsOnSuccess()
+ void commitsCurrentOffsetsOnSuccess() throws Exception
{
int numberOfGeneratedMessages =
recordGenerator.generate(false, false, messageSender);
{
int numberOfGeneratedMessages =
recordGenerator.generate(false, false, messageSender);
.isThrownBy(() -> endlessConsumer.exitStatus())
.describedAs("Consumer should still be running");
.isThrownBy(() -> endlessConsumer.exitStatus())
.describedAs("Consumer should still be running");
+ endlessConsumer.stop();
recordGenerator.assertBusinessLogic();
}
recordGenerator.assertBusinessLogic();
}
- endlessConsumer.stop();
testRecordProducer.close();
offsetConsumer.close();
}
testRecordProducer.close();
offsetConsumer.close();
}