projects
/
demos
/
kafka
/
training
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Springify: Testfall repariert - Seltsames Verhalten von `@KafkaHandler`!
[demos/kafka/training]
/
src
/
test
/
java
/
de
/
juplo
/
kafka
/
ApplicationTests.java
diff --git
a/src/test/java/de/juplo/kafka/ApplicationTests.java
b/src/test/java/de/juplo/kafka/ApplicationTests.java
index
8a485b3
..
f13fdde
100644
(file)
--- a/
src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/
src/test/java/de/juplo/kafka/ApplicationTests.java
@@
-1,7
+1,6
@@
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@
-17,6
+16,7
@@
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
@@
-27,7
+27,6
@@
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@
-71,11
+70,11
@@
class ApplicationTests
@Autowired
EndlessConsumer endlessConsumer;
@Autowired
@Autowired
EndlessConsumer endlessConsumer;
@Autowired
-
RecordHandler record
Handler;
+
ClientMessageHandler clientMessage
Handler;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
- Set<C
onsumerRecord<String, ClientMessage>> receivedRecords
;
+ Set<C
lientMessage> received
;
/** Tests methods */
/** Tests methods */
@@
-87,7
+86,7
@@
class ApplicationTests
await("100 records received")
.atMost(Duration.ofSeconds(30))
await("100 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> received
Records
.size() == 100);
+ .until(() -> received.size() == 100);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
@@
-112,7
+111,7
@@
class ApplicationTests
await("99 records received")
.atMost(Duration.ofSeconds(30))
await("99 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> received
Records
.size() == 99);
+ .until(() -> received.size() == 99);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
@@
-137,17
+136,17
@@
class ApplicationTests
@Test
void commitsOffsetOnProgramLogicErrorFoo()
{
@Test
void commitsOffsetOnProgramLogicErrorFoo()
{
-
recordHandler.testHandler = (record
) ->
+
clientMessageHandler.testHandler = (clientMessage, metadata
) ->
{
{
- if (Integer.parseInt(
record.value()
.message)%10 ==0)
- throw new RuntimeException("BOOM: " +
record.value()
.message + "%10 == 0");
+ if (Integer.parseInt(
clientMessage
.message)%10 ==0)
+ throw new RuntimeException("BOOM: " +
clientMessage
.message + "%10 == 0");
};
send100Messages((key, counter) -> serialize(key, counter));
await("80 records received")
.atMost(Duration.ofSeconds(30))
};
send100Messages((key, counter) -> serialize(key, counter));
await("80 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> received
Records
.size() == 100);
+ .until(() -> received.size() == 100);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
@@
-272,11
+271,11
@@
class ApplicationTests
@BeforeEach
public void init()
{
@BeforeEach
public void init()
{
-
recordHandler.testHandler = (record
) -> {};
+
clientMessageHandler.testHandler = (clientMessage, metadata
) -> {};
oldOffsets = new HashMap<>();
newOffsets = new HashMap<>();
oldOffsets = new HashMap<>();
newOffsets = new HashMap<>();
- received
Records
= new HashSet<>();
+ received = new HashSet<>();
doForCurrentOffsets((tp, offset) ->
{
doForCurrentOffsets((tp, offset) ->
{
@@
-284,14
+283,13
@@
class ApplicationTests
newOffsets.put(tp, offset - 1);
});
newOffsets.put(tp, offset - 1);
});
-
record
Handler.captureOffsets =
-
record
->
+
clientMessage
Handler.captureOffsets =
+
(clientMessage, metadata)
->
{
{
- received
Records.add(record
);
- log.debug("TEST: Processing record #{}: {}", received
Records.size(), record.value()
);
+ received
.add(clientMessage
);
+ log.debug("TEST: Processing record #{}: {}", received
.size(), clientMessage
);
newOffsets.put(
newOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
+ new TopicPartition(metadata.topic(), metadata.partition()), metadata.offset());
};
endlessConsumer.start();
};
endlessConsumer.start();
@@
-310,18
+308,18
@@
class ApplicationTests
}
}
}
}
- public static class
RecordHandler implements Consumer<ConsumerRecord<String, ClientMessage>
>
+ public static class
ClientMessageHandler implements BiConsumer<ClientMessage, ConsumerRecordMetadata
>
{
{
-
Consumer<ConsumerRecord<String, ClientMessage>
> captureOffsets;
-
Consumer<ConsumerRecord<String, ClientMessage>
> testHandler;
+
BiConsumer<ClientMessage, ConsumerRecordMetadata
> captureOffsets;
+
BiConsumer<ClientMessage, ConsumerRecordMetadata
> testHandler;
@Override
@Override
- public void accept(C
onsumerRecord<String, ClientMessage> record
)
+ public void accept(C
lientMessage clientMessage, ConsumerRecordMetadata metadata
)
{
captureOffsets
.andThen(testHandler)
{
captureOffsets
.andThen(testHandler)
- .accept(
record
);
+ .accept(
clientMessage, metadata
);
}
}
}
}
@@
-331,9
+329,9
@@
class ApplicationTests
{
@Primary
@Bean
{
@Primary
@Bean
- public
Consumer<ConsumerRecord<String, ClientMessage>
> testHandler()
+ public
BiConsumer<ClientMessage, ConsumerRecordMetadata
> testHandler()
{
{
- return new
Record
Handler();
+ return new
ClientMessage
Handler();
}
@Bean
}
@Bean