From 892cd1979190f6ec96f91b4cce908d1e111b5a36 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 13 Jul 2024 11:35:24 +0200 Subject: [PATCH] Aligned log messages with the join-demos --- .../wordcount/counter/AggregationTopologyTest.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index c6bbb4e..d21ad47 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,16 +3,11 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.time.Instant; -import java.time.LocalTime; -import java.time.ZoneId; import java.util.Properties; import java.util.stream.Stream; @@ -109,9 +104,9 @@ public class AggregationTopologyTest { TestRecord record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second)); log.info( - "Sending at {} ({}): {}", + "Sending @ {}: {} = {}", record.getRecordTime().toEpochMilli(), - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.key(), record.value()); in.pipeInput(record); } @@ -127,9 +122,8 @@ public class AggregationTopologyTest .readRecordsToList() .stream() .peek(record -> log.info( - "Received for {} ({}): {}={}", + "Received @ {}: {} = {}", record.getRecordTime().toEpochMilli(), - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), record.key(), record.value())) .map(record -> record.value()); -- 2.20.1