- @Bean
- KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
- {
- Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
- Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
- Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
-
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
- props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
- props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
- return new KafkaConsumer<>(props);
- }
-