Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
1 package de.juplo.kafka.payment.transfer.persistence;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.extern.slf4j.Slf4j;
9
10 import java.util.HashMap;
11 import java.util.Map;
12 import java.util.Optional;
13
14
15 @Slf4j
16 public class InMemoryTransferRepository implements TransferRepository
17 {
18   private final int numPartitions;
19   private final Map<Long, String> mappings[];
20   private final ObjectMapper mapper;
21
22
23   public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
24   {
25     this.numPartitions = numPartitions;
26     this.mappings = new HashMap[numPartitions];
27     for (int i = 0; i < numPartitions; i++)
28       this.mappings[i] = new HashMap<>();
29     this.mapper = mapper;
30   }
31
32
33   @Override
34   public void store(Transfer transfer)
35   {
36     try
37     {
38       int partition = partitionForId(transfer.getId());
39       mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
40     }
41     catch (JsonProcessingException e)
42     {
43       throw new RuntimeException(e);
44     }
45   }
46
47   @Override
48   public Optional<Transfer> get(Long id)
49   {
50     return
51         Optional
52             .ofNullable(this.mappings[partitionForId(id)])
53             .map(mapping -> mapping.get(id))
54             .map(json -> {
55               try
56               {
57                 return mapper.readValue(json, Transfer.class);
58               }
59               catch (JsonProcessingException e)
60               {
61                 throw new RuntimeException("Could not convert JSON: " + json, e);
62               }
63             });
64   }
65
66   @Override
67   public void remove(Long id)
68   {
69     mappings[partitionForId(id)].remove(id);
70   }
71
72   @Override
73   public void resetStorageForPartition(int partition)
74   {
75     log.info(
76         "resetting storage for partition {}: dropping {} entries",
77         partition,
78         mappings[partition].size());
79     mappings[partition].clear();
80   }
81
82   private int partitionForId(long id)
83   {
84     String key = Long.toString(id);
85     return TransferPartitioner.computeHashForKey(key, numPartitions);
86   }
87 }