import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.context.EmbeddedKafka;
import java.util.HashMap;
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.consumer.topic=" + TOPIC,
+ "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+ "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
})
@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
@FieldSource("PARTITIONS")
void testExistingOffset(int partition) throws Exception
{
- String key = Integer.toString(partition);
- String value = "Hallo Welt! -- " + partition;
- RecordMetadata recordMetadata = send(partition, key, value);
+ SendResult<byte[], byte[]> result = send(partition);
+
+ RecordMetadata recordMetadata = result.getRecordMetadata();
ResponseEntity<String> response = restTemplate.getForEntity(
"/{partition}/{offset}",
String.class,
recordMetadata.partition(),
recordMetadata.offset());
+
+ String key = new String(result.getProducerRecord().key());
+ String value = new String(result.getProducerRecord().value());
+
assertThat(response.getStatusCode())
.isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
assertThat(response.getHeaders())
@FieldSource("PARTITIONS")
void testAlreadyDeletedOffset(int partition) throws Exception
{
- RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition);
+ RecordMetadata recordMetadata = send(partition).getRecordMetadata();
deleteAllRecords(adminClient);
ResponseEntity<String> response = restTemplate.getForEntity(
"/{partition}/{offset}",
static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray();
@Autowired
- KafkaTemplate<String, String> kafkaTemplate;
+ KafkaTemplate<byte[], byte[]> kafkaTemplate;
@Autowired
AdminClient adminClient;
@Autowired
final long[] currentOffsets = new long[NUM_PARTITIONS];
+ int message = 0;
+
@BeforeEach
void resetCurrentOffsets()
return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
}
- private RecordMetadata send(int partition, String key, String value) throws Exception
+ private SendResult<byte[], byte[]> send(int partition) throws Exception
{
+ String key = Integer.toString(partition);
+ String value = "Hällö Wöhrld!%? -- " + ++message;
return kafkaTemplate
- .send(TOPIC, partition, key, value)
+ .send(TOPIC, partition, key.getBytes(), value.getBytes())
.thenApply(result ->
{
RecordMetadata metadata = result.getRecordMetadata();
currentOffsets[metadata.partition()] = metadata.offset();
- return result.getRecordMetadata();
+ return result;
})
.get();
}