import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
{
@Bean
public DeadLetterConsumer deadLetterConsumer(
- Consumer<String, String> kafkaConsumer,
+ Consumer<byte[], byte[]> kafkaConsumer,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
}
@Bean(destroyMethod = "")
- public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+ public KafkaConsumer<byte[], byte[]> kafkaConsumer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", properties.getClientId());
props.put("group.id", properties.getConsumer().getGroupId());
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
+ props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+ props.put("value.deserializer", ByteArrayDeserializer.class.getName());
props.put("auto.offset.reset", "none");
return new KafkaConsumer<>(props);
private final int numPartitions;
private final Queue<FetchRequest>[] pendingFetchRequests;
private final FetchRequest[] currentFetchRequest;
- private final Consumer<String, String> consumer;
+ private final Consumer<byte[], byte[]> consumer;
private final Thread workerThread;
private final Runnable closeCallback;
String clientId,
String topic,
String headerPrefix,
- Consumer<String, String> consumer,
+ Consumer<byte[], byte[]> consumer,
Runnable closeCallback)
{
this.id = clientId;
{
try
{
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMinutes(1));
log.info("{} - Received {} messages", id, records.count());
List<TopicPartition> partitionsToPause = new LinkedList<>();
for (TopicPartition partition : records.partitions())
{
- for (ConsumerRecord<String, String> record : records.records(partition))
+ for (ConsumerRecord<byte[], byte[]> record : records.records(partition))
{
log.info(
"{} - fetched partition={}-{}, offset={}: {}",
currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest;
consumer.seek(fetchRequest.partition(), fetchRequest.offset());
}
- Mono<ConsumerRecord<String, String>> requestRecord(int partition, long offset)
+ Mono<ConsumerRecord<byte[], byte[]>> requestRecord(int partition, long offset)
{
if (partition >= numPartitions || partition < 0)
{
throw new NonExistentPartitionException(topic, partition);
}
- CompletableFuture<ConsumerRecord<String, String>> future = new CompletableFuture<>();
+ CompletableFuture<ConsumerRecord<byte[], byte[]>> future = new CompletableFuture<>();
FetchRequest fetchRequest = new FetchRequest(
new TopicPartition(topic, partition),
@GetMapping(path = "/{partition}/{offset}")
- public Mono<ResponseEntity<String>> recordAtOffset(
+ public Mono<ResponseEntity<byte[]>> recordAtOffset(
@PathVariable int partition,
@PathVariable long offset)
{
.contentType(mediaType)
.header(
deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
- UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8))
+ UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8))
.header(
deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
Long.toString(record.timestamp()))
public record FetchRequest(
TopicPartition partition,
long offset,
- CompletableFuture<ConsumerRecord<String, String>> future)
+ CompletableFuture<ConsumerRecord<byte[], byte[]>> future)
{
@Override
public String toString()