From: Kai Moritz Date: Sat, 13 Jul 2024 09:35:24 +0000 (+0200) Subject: Aligned log messages with the join-demos X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=892cd1979190f6ec96f91b4cce908d1e111b5a36;p=demos%2Fkafka%2Fwordcount Aligned log messages with the join-demos --- diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java index c6bbb4e..d21ad47 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java @@ -3,16 +3,11 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.time.Instant; -import java.time.LocalTime; -import java.time.ZoneId; import java.util.Properties; import java.util.stream.Stream; @@ -109,9 +104,9 @@ public class AggregationTopologyTest { TestRecord record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second)); log.info( - "Sending at {} ({}): {}", + "Sending @ {}: {} = {}", record.getRecordTime().toEpochMilli(), - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), + record.key(), record.value()); in.pipeInput(record); } @@ -127,9 +122,8 @@ public class AggregationTopologyTest .readRecordsToList() .stream() .peek(record -> log.info( - "Received for {} ({}): {}={}", + "Received @ {}: {} = {}", record.getRecordTime().toEpochMilli(), - LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()), record.key(), record.value())) .map(record -> record.value());