Aligned log messages with the join-demos
authorKai Moritz <kai@juplo.de>
Sat, 13 Jul 2024 09:35:24 +0000 (11:35 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Jul 2024 05:47:59 +0000 (07:47 +0200)
src/test/java/de/juplo/kafka/wordcount/counter/AggregationTopologyTest.java

index c6bbb4e..d21ad47 100644 (file)
@@ -3,16 +3,11 @@ package de.juplo.kafka.wordcount.counter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.test.TestRecord;
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
 import java.time.Instant;
-import java.time.LocalTime;
-import java.time.ZoneId;
 import java.util.Properties;
 import java.util.stream.Stream;
 
@@ -109,9 +104,9 @@ public class AggregationTopologyTest
   {
     TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
     log.info(
-        "Sending at {} ({}): {}",
+        "Sending  @ {}: {} = {}",
         record.getRecordTime().toEpochMilli(),
-        LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+        record.key(),
         record.value());
     in.pipeInput(record);
   }
@@ -127,9 +122,8 @@ public class AggregationTopologyTest
         .readRecordsToList()
         .stream()
         .peek(record -> log.info(
-            "Received for {} ({}): {}={}",
+            "Received @ {}: {} = {}",
             record.getRecordTime().toEpochMilli(),
-            LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
             record.key(),
             record.value()))
         .map(record -> record.value());