import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static de.juplo.kafka.ApplicationTests.NUM_PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static de.juplo.kafka.ApplicationTests.*;
import static org.assertj.core.api.Assertions.assertThat;
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.consumer.topic=" + TOPIC,
+ "juplo.consumer.header-prefix=" + HEADER_PREFIX,
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
.isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
assertThat(response.getHeaders())
.containsEntry(
- deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+ HEADER_PREFIX + DeadLetterConsumer.KEY,
List.of(key));
assertThat(response.getHeaders())
.containsEntry(
- deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+ HEADER_PREFIX + DeadLetterConsumer.TIMESTAMP,
List.of(Long.toString(recordMetadata.timestamp())));
assertThat(response.getBody())
.isEqualTo(value);
static final String TOPIC = "ExampleConsumerTest_TEST";
static final int NUM_PARTITIONS = 7;
static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray();
+ static final String HEADER_PREFIX = "X-FOO--";
@Autowired
KafkaTemplate<byte[], byte[]> kafkaTemplate;
AdminClient adminClient;
@Autowired
TestRestTemplate restTemplate;
- @Autowired
- DeadLetterConsumer deadLetterConsumer;
final long[] currentOffsets = new long[NUM_PARTITIONS];