return
new RestGateway(
kafkaProperties.getClientId(),
+ applicationProperties.getTopic(),
applicationProperties.getPartition(),
kafkaTemplate);
}
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
@ConfigurationProperties(prefix = "sumup.gateway")
@Getter
@Setter
public class ApplicationProperties
{
+ @NotNull
+ @NotEmpty
+ private String topic;
private Integer partition;
}
public class RestGateway
{
private final String id;
+ private final String topic;
private final Integer partition;
private final KafkaTemplate<String, Integer> kafkaTemplate;
final long time = System.currentTimeMillis();
ListenableFuture<SendResult<String, Integer>> future =
- kafkaTemplate.send(null, partition, key, value);
+ kafkaTemplate.send(topic, partition, key, value);
long now = System.currentTimeMillis();
@SpringBootTest(
properties = {
- "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "sumup.gateway.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"sumup.gateway.topic=" + TOPIC})
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)