У нас есть топология потоков, которая будет работать на нескольких машинах. Мы сохраняем результаты агрегирования с временным окном в хранилище состояний. Поскольку хранилища состояний хранят локальные данные, я думаю, что агрегирование должно выполняться по другой теме для общего агрегирования. Но мне кажется, что мне чего-то не хватает, потому что ни один из примеров не выполняет общую агрегацию на другом KStream или процессоре.
Нужно ли нам использовать логику groupBy для хранения общей агрегации, или использовать GlobalKtable, или просто где-нибудь реализовать наш собственный код слияния?
Какая для этого правильная архитектура?
В приведенном ниже коде я попытался сгруппировать все сообщения, поступающие на процессор, с постоянным ключом, чтобы сохранить общую агрегацию только на одной машине, но я думаю, что это потеряло бы параллелизм, который обеспечивает Kafka.
dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
.filter((key, event) -> event != null && event.getClientCreationDate() != null);
dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
.groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
.windowedBy(timeWindow)
.count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));
event.getName()
илиevent.getClientUuid()
или аналогичным образом, но, как я уже сказал, я не знаю ваших данных - person OneCricketeer   schedule 12.09.2018