KafkaTemplate<String, String> messageTemplate,
String infoTopic,
String dataTopic,
- ConsumerTaskRunner consumerTaskRunner)
+ ChannelRunner channelRunner)
{
send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- consumerTaskRunner.executeConsumerTasks();
+ channelRunner.executeConsumerTasks();
}
private static void send(
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
- public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
+ public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException
{
- consumerTaskRunner.joinConsumerTasks();
+ channelRunner.joinConsumerTasks();
}
}