+ KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServers(), "juplo.transfer.bootstrap-servers must be set");
+ Assert.hasText(properties.getGroupId(), "juplo.transfer.group-id must be set");
+ Assert.hasText(properties.getGroupInstanceId(), "juplo.transfer.group-instance-id must be set");
+
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+ props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, properties.getGroupInstanceId());
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getCanonicalName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return new KafkaConsumer<>(props);
+ }
+
+ @Bean(destroyMethod = "shutdown")
+ TransferConsumer transferConsumer(
+ TransferServiceProperties properties,
+ KafkaConsumer<String, String> consumer,
+ AdminClient adminClient,
+ TransferRepository repository,
+ LocalStateStoreSettings localStateStoreSettings,
+ ObjectMapper mapper,
+ TransferService productionTransferService,
+ TransferService restoreTransferService)
+ {
+ return
+ new TransferConsumer(
+ properties.getTopic(),
+ properties.getNumPartitions(),
+ properties.getInstanceIdUriMapping(),
+ consumer,
+ adminClient,
+ repository,
+ Clock.systemDefaultZone(),
+ localStateStoreSettings.interval,
+ mapper,
+ new TransferConsumer.ConsumerUseCases() {
+ @Override
+ public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount)
+ {
+ return productionTransferService.create(id, payer, payee, amount);
+ }
+
+ @Override
+ public Optional<Transfer> get(Long id)
+ {
+ return productionTransferService.get(id);
+ }
+
+ @Override
+ public TransferStateChangedEvent handleStateChange(
+ TransferStateChangedEvent stateChangedEvent)
+ {
+ return productionTransferService.handleStateChange(stateChangedEvent);
+ }
+ },
+ new TransferConsumer.ConsumerUseCases() {
+ @Override
+ public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount)
+ {
+ return restoreTransferService.create(id, payer, payee, amount);
+ }
+
+ @Override
+ public Optional<Transfer> get(Long id)
+ {
+ return restoreTransferService.get(id);
+ }
+
+ @Override
+ public TransferStateChangedEvent handleStateChange(
+ TransferStateChangedEvent stateChangedEvent)
+ {
+ return restoreTransferService.handleStateChange(stateChangedEvent);
+ }
+ });
+ }
+
+ @Bean
+ KafkaMessagingService kafkaMessagingService(