import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class DriverController
{
private final EndlessConsumer consumer;
- private final SumRecordHandler wordcount;
+ private final SumRecordHandler sumRecordHandler;
@PostMapping("start")
@GetMapping("seen")
- public Map<Integer, Map<String, Map<String, Long>>> seen()
+ public Map<Integer, Map<String, List<Long>>> seen()
{
- return wordcount.getSeen();
+ return sumRecordHandler.getSeen();
}
@GetMapping("seen/{user}")
- public ResponseEntity<Map<String, Long>> seen(@PathVariable String user)
+ public ResponseEntity<List<Long>> seen(@PathVariable String user)
{
- for (Map<String, Map<String, Long>> users : wordcount.getSeen().values())
+ for (Map<String, List<Long>> users : sumRecordHandler.getSeen().values())
{
- Map<String, Long> words = users.get(user);
- if (words != null)
- return ResponseEntity.ok(words);
+ List<Long> results = users.get(user);
+ if (results != null)
+ return ResponseEntity.ok(results);
}
return ResponseEntity.notFound().build();
import java.util.Optional;
-public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
+public interface PartitionStatisticsRepository extends MongoRepository<StateDocument, String>
{
- public Optional<StatisticsDocument> findById(String partition);
+ public Optional<StateDocument> findById(String partition);
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@Document(collection = "statistics")
+@ToString
+public class StateDocument
+{
+ @Id
+ public String id;
+ public long offset = -1l;
+ public Map<String, Long> state;
+ public Map<String, List<Long>> seen;
+
+ public StateDocument()
+ {
+ }
+
+ public StateDocument(Integer partition)
+ {
+ this.id = Integer.toString(partition);
+ this.state = new HashMap<>();
+ this.seen = new HashMap<>();
+ }
+
+ public StateDocument(
+ Integer partition,
+ Map<String, Long> state,
+ Map<String, List<Long>> seen)
+ {
+ this.id = Integer.toString(partition);
+ this.state = state;
+ this.seen = seen;
+ }
+}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Document(collection = "statistics")
-@ToString
-public class StatisticsDocument
-{
- @Id
- public String id;
- public long offset = -1l;
- public Map<String, Map<String, Long>> statistics;
-
- public StatisticsDocument()
- {
- }
-
- public StatisticsDocument(Integer partition)
- {
- this.id = Integer.toString(partition);
- this.statistics = new HashMap<>();
- }
-
- public StatisticsDocument(Integer partition, Map<String, Map<String, Long>> statistics, long offset)
- {
- this.id = Integer.toString(partition);
- this.statistics = statistics;
- this.offset = offset;
- }
-}
return state.get(user);
}
+
+ protected Map<String, Long> getState()
+ {
+ return state;
+ }
}
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
-import java.util.Map;
@RequiredArgsConstructor
Integer partition = tp.partition();
Long offset = consumer.position(tp);
log.info("{} - adding partition: {}, offset={}", id, partition, offset);
- StatisticsDocument document =
+ StateDocument document =
repository
.findById(Integer.toString(partition))
- .orElse(new StatisticsDocument(partition));
+ .orElse(new StateDocument(partition));
if (document.offset >= 0)
{
// Only seek, if a stored offset was found
// Otherwise: Use initial offset, generated by Kafka
consumer.seek(tp, document.offset);
}
- handler.addPartition(partition, document.statistics);
+ handler.addPartition(partition, document);
});
}
id,
partition,
newOffset);
- Map<String, Map<String, Long>> removed = handler.removePartition(partition);
- repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+ repository.save(handler.removePartition(partition));
});
}
{
log.debug("Storing data and offsets, last commit: {}", lastCommit);
handler.getSeen().forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
+ new StateDocument(
partiton,
statistics,
consumer.position(new TopicPartition(topic, partiton)))));
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.regex.Pattern;
-
@Slf4j
public class SumRecordHandler implements RecordHandler<String, String>
{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
-
- private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
+ private final Map<Integer, Map<String, List<Long>>> seen = new HashMap<>();
+ private final Map<Integer, SumBusinessLogic> state = new HashMap<>();
@Override
{
Integer partition = record.partition();
String user = record.key();
- Map<String, Map<String, Long>> users = seen.get(partition);
-
- Map<String, Long> words = users.get(user);
- if (words == null)
- {
- words = new HashMap<>();
- users.put(user, words);
- }
-
- for (String word : PATTERN.split(record.value()))
+ String message = record.value();
+ switch (message)
{
- Long num = words.get(word);
- if (num == null)
- {
- num = 1l;
- }
- else
- {
- num++;
- }
- words.put(word, num);
+ case "START":
+ state.get(partition).startSum(user);
+ return;
+
+ case "END":
+ Long result = state.get(partition).endSum(user);
+ log.info("New result for {}: {}", user, result);
+ return;
+
+ default:
+ state.get(partition).addToSum(user, Integer.parseInt(message));
+ return;
}
}
- public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
+ protected void addPartition(Integer partition, StateDocument document)
{
- seen.put(partition, statistics);
+ this.seen.put(partition, document.seen);
+ this.state.put(partition, new SumBusinessLogic(document.state));
}
- public Map<String, Map<String, Long>> removePartition(Integer partition)
+ protected StateDocument removePartition(Integer partition)
{
- return seen.remove(partition);
+ return new StateDocument(
+ partition,
+ this.state.remove(partition).getState(),
+ this.seen.remove(partition));
}
- public Map<Integer, Map<String, Map<String, Long>>> getSeen()
+ public Map<Integer, Map<String, List<Long>>> getSeen()
{
return seen;
}
Long offset = offsetConsumer.position(tp);
log.info("New position for {}: {}", tp, offset);
Integer partition = tp.partition();
- StatisticsDocument document =
+ StateDocument document =
partitionStatisticsRepository
.findById(partition.toString())
- .orElse(new StatisticsDocument(partition));
+ .orElse(new StateDocument(partition));
document.offset = offset;
partitionStatisticsRepository.save(document);
});