- private long consumed = 0;
- private KafkaConsumer<String, String> consumer;
- private Lock lock = new ReentrantLock();
- private Condition stopped = lock.newCondition();
-
-
- public SimpleConsumer()
- {
- // tag::create[]
- Properties props = new Properties();
- props.put("bootstrap.servers", ":9092");
- props.put("group.id", "my-consumer"); // << Used for Offset-Commits
- // end::create[]
- props.put("auto.offset.reset", "earliest");
- // tag::create[]
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
+ private final String id;
+ private final String topic;
+ private final Consumer<String, Message> consumer;
+ private final MessageHandler messageHandler;