X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=a5554c1c89371c3cf646bb3fc652a900608bffda;hb=253b29d99f9aa8edabc5fcd28f6fe28644b18b2c;hp=913ffd52cfe5aaa0ebc140452b974e237469c2b4;hpb=2e829aa88ce1dc203d2daad12140e5eb95794f1a;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 913ffd5..a5554c1 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -27,19 +27,7 @@ public class SimpleConsumer public SimpleConsumer() { - // tag::create[] - Properties props = new Properties(); - props.put("bootstrap.servers", ":9092"); - props.put("group.id", "my-consumer"); // << Used for Offset-Management - // end::create[] - props.put("auto.offset.reset", "earliest"); - // tag::create[] - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - Consumer consumer = new KafkaConsumer<>(props); - // end::create[] - this.consumer = consumer; + this.consumer = null; // TODO: Eine Instanz von KafkaConsumer erzeugen } @@ -52,31 +40,12 @@ public class SimpleConsumer log.info("{} - Subscribing to topic test", id); consumer.subscribe(Arrays.asList("test")); - // tag::loop[] while (true) { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - // end::loop[] - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - // tag::loop[] + // TODO: + // Über consumer.poll() Nachrichten abrufen und diese + // über log.info() ausgeben. } - // end::loop[] } catch(WakeupException e) {