projects
/
demos
/
kafka
/
training
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Springify: Can send multiple types of messages
[demos/kafka/training]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
RestProducer.java
diff --git
a/src/main/java/de/juplo/kafka/RestProducer.java
b/src/main/java/de/juplo/kafka/RestProducer.java
index
7d9bf12
..
6c852ce
100644
(file)
--- a/
src/main/java/de/juplo/kafka/RestProducer.java
+++ b/
src/main/java/de/juplo/kafka/RestProducer.java
@@
-5,14
+5,13
@@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.http.HttpStatus;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.http.HttpStatus;
-import org.springframework.
http.MediaType
;
+import org.springframework.
kafka.support.serializer.JsonSerializer
;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.annotation.PreDestroy;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.annotation.PreDestroy;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
@Slf4j
@Slf4j
@@
-21,7
+20,7
@@
public class RestProducer
{
private final String id;
private final String topic;
{
private final String id;
private final String topic;
- private final KafkaProducer<String,
String
> producer;
+ private final KafkaProducer<String,
Object
> producer;
private long produced = 0;
private long produced = 0;
@@
-40,26
+39,47
@@
public class RestProducer
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", JsonSerializer.class.getName());
+ props.put(JsonSerializer.TYPE_MAPPINGS,
+ "message:" + ClientMessage.class.getName() + "," +
+ "greeting:" + Greeting.class.getName());
this.producer = new KafkaProducer<>(props);
}
@PostMapping(path = "{key}")
this.producer = new KafkaProducer<>(props);
}
@PostMapping(path = "{key}")
- public DeferredResult<ProduceResult>
send
(
+ public DeferredResult<ProduceResult>
message
(
@PathVariable String key,
@RequestBody String value)
{
@PathVariable String key,
@RequestBody String value)
{
- DeferredResult<ProduceResult> result = new DeferredResult<>();
+ final ProducerRecord<String, Object> record = new ProducerRecord<>(
+ topic, // Topic
+ key, // Key
+ new ClientMessage(key, value) // Value
+ );
- final long time = System.currentTimeMillis();
+ return send(record);
+ }
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ @PostMapping(path = "/")
+ public DeferredResult<ProduceResult> greeting(
+ @RequestBody String name)
+ {
+ final ProducerRecord<String, Object> record = new ProducerRecord<>(
topic, // Topic
topic, // Topic
-
key
, // Key
-
value
// Value
+
name
, // Key
+
new Greeting(name)
// Value
);
);
+ return send(record);
+ }
+
+ private DeferredResult<ProduceResult> send(ProducerRecord<String, Object> record)
+ {
+ DeferredResult<ProduceResult> result = new DeferredResult<>();
+
+ final long time = System.currentTimeMillis();
+
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();
@@
-96,9
+116,8
@@
public class RestProducer
long now = System.currentTimeMillis();
log.trace(
long now = System.currentTimeMillis();
log.trace(
- "{} - Queued
#{}
key={} latency={}ms",
+ "{} - Queued key={} latency={}ms",
id,
id,
- value,
record.key(),
now - time
);
record.key(),
now - time
);