String topic,
String clientId)
{
- // TODO:
- // Instanziierung des KafkaProducer aus Ihrer bisherigen Implementierung
- // Hier:
- // - Den Parameter "broker" des Konstruktors für "bootstrap.servers" verwenden
- // - Den Parameter "topic" des Konstruktors im Attribut "this.topic" merken
- // - Den erzeugten KafkaProducer in "this.producer" ablegen
+ Properties props = new Properties();
+ props.put("bootstrap.servers", broker);
+ props.put("client.id", clientId); // Nur zur Wiedererkennung
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
this.id = clientId;
this.topic = topic;
+ producer = new KafkaProducer<>(props);
}
public void run()
{
for (; running; i++)
{
- // Versenden der Nachrichten aus Ihrer bisherigen Implementierung
- // Hier:
- // - ACHTUNG: Schreiben Sie in das Topic "this.topic"
- // - Ergänzen Sie mit "log.info()" eine Log-Meldung, die gesendete Nachrichten ausgibt
- // - Zählen Sie außerdem die versendeten Nachrichten in "this.produced"
+ final ProducerRecord<String, String> record = new ProducerRecord<>(
+ topic, // Topic
+ Long.toString(i%10), // Key
+ Long.toString(i) // Value
+ );
+
+ producer.send(record);
+
+ produced++;
}
}
catch (Exception e)