import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.Properties;
public RestGateway restGateway(
ApplicationProperties applicationProperties,
KafkaProperties kafkaProperties,
- Producer<String, Integer> kafkaProducer)
+ KafkaTemplate<String, Integer> kafkaTemplate)
{
return
new RestGateway(
kafkaProperties.getClientId(),
- applicationProperties.getTopic(),
applicationProperties.getPartition(),
- kafkaProducer);
+ kafkaTemplate);
}
@Bean(destroyMethod = "close")
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.HttpStatus;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
public class RestGateway
{
private final String id;
- private final String topic;
private final Integer partition;
- private final Producer<String, Integer> producer;
+ private final KafkaTemplate<String, Integer> kafkaTemplate;
private long produced = 0;
final long time = System.currentTimeMillis();
- final ProducerRecord<String, Integer> record = new ProducerRecord<>(
- topic, // Topic
- partition, // Partition - Uses default-algorithm, if null
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- result.setErrorResult(new ProduceFailure(e));
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
+ ListenableFuture<SendResult<String, Integer>> future =
+ kafkaTemplate.send(null, partition, key, value);
long now = System.currentTimeMillis();
+
+ future.addCallback(
+ sendResult ->
+ {
+ // HANDLE SUCCESS
+ produced++;
+ RecordMetadata metadata = sendResult.getRecordMetadata();
+ ProducerRecord<String, Integer> record = sendResult.getProducerRecord();
+ result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ },
+ e->
+ {
+ // HANDLE ERROR
+ result.setErrorResult(new ProduceFailure(e));
+ log.error(
+ "{} - ERROR key={} latency={}ms: {}",
+ id,
+ key,
+ now - time,
+ e.toString()
+ );
+ });
+
log.trace(
"{} - Queued message with key={} latency={}ms",
id,
- record.key(),
+ key,
now - time
);
-sumup:
- gateway:
- topic: test
management:
endpoint:
shutdown:
group-id: ${spring.kafka.consumer.group-id}
auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
auto-commit-interval-ms: ${spring.kafka.consumer.properties.auto.commit.interval.ms}
- topic: ${sumup.gateway.topic}
+ topic: ${spring.kafka.template.default-topic}
acks: ${spring.kafka.producer.acks}
batch-size: ${spring.kafka.producer.batch-size}
linger-ms: ${spring.kafka.producer.properties.linger.ms}
linger.ms: 0
delivery.timeout.ms: 20000 # 20 Sekunden
request.timeout.ms: 10000 # 10 Sekunden
+ template:
+ default-topic: test
logging:
level:
root: INFO