import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongSerializer;
import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
MockRecordHandler mockHandler;
@Autowired
AtomicBoolean isTerminatedExceptionally;
+ @Autowired
+ ExampleConsumer exampleConsumer;
@BeforeEach
- void resetTopic() {
+ void setUp()
+ {
+ if (!exampleConsumer.isRunning())
+ {
+ exampleConsumer.start();
+ }
+ }
+
+ @AfterEach
+ void tearDown() throws InterruptedException
+ {
adminClient.deleteRecords(Map.of(
new TopicPartition(TOPIC, 0), deleteAllRecordsByPartition(0),
new TopicPartition(TOPIC, 1), deleteAllRecordsByPartition(1)));
nextMessage = 1;
mockHandler.clear();
isTerminatedExceptionally.set(false);
+ exampleConsumer.stop();
}
private RecordsToDelete deleteAllRecordsByPartition(int x)