import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Test;
-import java.time.Duration;
import java.time.Instant;
-import java.time.LocalTime;
-import java.time.ZoneId;
import java.util.Properties;
import java.util.stream.Stream;
{
TestRecord<String, String> record = new TestRecord<>(KEY, value, Instant.ofEpochSecond(second));
log.info(
- "Sending at {} ({}): {}",
+ "Sending @ {}: {} = {}",
record.getRecordTime().toEpochMilli(),
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
+ record.key(),
record.value());
in.pipeInput(record);
}
.readRecordsToList()
.stream()
.peek(record -> log.info(
- "Received for {} ({}): {}={}",
+ "Received @ {}: {} = {}",
record.getRecordTime().toEpochMilli(),
- LocalTime.ofInstant(record.getRecordTime(), ZoneId.systemDefault()),
record.key(),
record.value()))
.map(record -> record.value());