+ KTable<Long, User> usersTable =
+ builder
+ .table(
+ users,
+ Consumed.with(Serdes.Long(), null, null, AutoOffsetReset.EARLIEST),
+ Materialized.as(users));
+
+ builder
+ .<String,Order>stream(orders)
+ .filter((id, order) -> order.getState() == OrderState.CREATED)
+ .map((id,order) ->
+ {
+ return new KeyValue<>(order.getCustomerId(), id);
+ })
+ .through(users + "_" + orders, Produced.with(Serdes.Long(), Serdes.String()))
+ .leftJoin(
+ usersTable,
+ (order, user) ->
+ {
+ return
+ OrderAndUser
+ .newBuilder()
+ .setOrderId(order)
+ .setUser(user)
+ .build();
+ },
+ Joined.keySerde(Serdes.Long()))
+ .map((id, orderAndUser) ->
+ {
+ List<CharSequence> messages = new LinkedList<>();
+
+ Builder validation =
+ OrderValidation
+ .newBuilder()
+ .setOrderId(orderAndUser.getOrderId())
+ .setCheckType(ORDER_USER_CHECK)
+ .setMessages(messages);
+
+ if (orderAndUser.getUser() == null)
+ return
+ new KeyValue<>(
+ orderAndUser.getOrderId().toString(),
+ validation.setValidationResult(FAIL).build());
+
+ User user = orderAndUser.getUser();
+
+ if (user.getLevel() != CustomerLevel.UNWANTED)
+ return
+ new KeyValue<>(
+ orderAndUser.getOrderId().toString(),
+ validation.setValidationResult(PASS).build());
+
+ messages.add("User is UNWANTED: " + user.getName());
+ return
+ new KeyValue<>(
+ orderAndUser.getOrderId().toString(),
+ validation.setValidationResult(FAIL).build());
+ })
+ .to(validation);
+