props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", JsonSerializer.class.getName());
- props.put(JsonSerializer.TYPE_MAPPINGS,
- "ADD:" + AddNumberMessage.class.getName() + "," +
- "CALC:" + CalculateSumMessage.class.getName());
-
+ props.put("value.serializer", "TODO: JsonSerializer konfigurieren");
return new KafkaProducer<>(props);
}
String key = record.key();
int number = record.value();
- for (int i = 1; i <= number; i++)
- {
- send(key, new AddNumberMessage(number, i));
- }
- send(key, new CalculateSumMessage(number));
+ // TODO: JSON-Nachrichten verschicken
}
private void send(String key, Object value)