@Slf4j
public class DeadLetterConsumer implements Runnable
{
+ public final static String KEY = "KEY";
+ public final static String TIMESTAMP = "TIMESTAMP";
+
+
private final String id;
private final String topic;
private final int numPartitions;
return Mono.fromFuture(future);
}
+ String prefixed(String headerName)
+ {
+ return headerName;
+ }
+
public void shutdown() throws InterruptedException
{
log.info("{} - Requesting shutdown", id);
import org.springframework.kafka.test.context.EmbeddedKafka;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
recordMetadata.offset());
assertThat(response.getStatusCode())
.isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
+ assertThat(response.getHeaders())
+ .containsEntry(
+ deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+ List.of(key));
+ assertThat(response.getHeaders())
+ .containsEntry(
+ deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+ List.of(Long.toString(recordMetadata.timestamp())));
assertThat(response.getBody())
.isEqualTo(value);
}
AdminClient adminClient;
@Autowired
TestRestTemplate restTemplate;
+ @Autowired
+ DeadLetterConsumer deadLetterConsumer;
final long[] currentOffsets = new long[NUM_PARTITIONS];