Топология потока Kafka на нескольких экземплярах

У нас есть топология потоков, которая будет работать на нескольких машинах. Мы сохраняем результаты агрегирования с временным окном в хранилище состояний. Поскольку хранилища состояний хранят локальные данные, я думаю, что агрегирование должно выполняться по другой теме для общего агрегирования. Но мне кажется, что мне чего-то не хватает, потому что ни один из примеров не выполняет общую агрегацию на другом KStream или процессоре.

Нужно ли нам использовать логику groupBy для хранения общей агрегации, или использовать GlobalKtable, или просто где-нибудь реализовать наш собственный код слияния?

Какая для этого правильная архитектура?

В приведенном ниже коде я попытался сгруппировать все сообщения, поступающие на процессор, с постоянным ключом, чтобы сохранить общую агрегацию только на одной машине, но я думаю, что это потеряло бы параллелизм, который обеспечивает Kafka.

dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
        .filter((key, event) -> event != null && event.getClientCreationDate() != null);

 dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
       .groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
       .windowedBy(timeWindow)
       .count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));

person talhaocakci    schedule 11.09.2018    source источник
comment
Как только вам нужно сгруппировать ключи, вы эффективно перенаправляете все данные в один процессор, независимо от используемой структуры (например, если вы сделали то же самое в Spark). Не совсем понятно, зачем нужно все сгруппировать и перепрограммировать   -  person OneCricketeer    schedule 12.09.2018
comment
Значит, вы говорите, что нам не нужно вручную объединять результаты, полученные из разных экземпляров одной и той же топологии?   -  person talhaocakci    schedule 12.09.2018
comment
Непонятно, какие данные вы получаете или ожидаете в качестве вывода. Однако, если вы хотите, чтобы что-то было распараллелено, в идеале это был бы не пример wordcount, где слово - only count. Например, вы можете захотеть посчитать по event.getName() или event.getClientUuid() или аналогичным образом, но, как я уже сказал, я не знаю ваших данных   -  person OneCricketeer    schedule 12.09.2018


Ответы (1)


В приведенном ниже коде я попытался сгруппировать все сообщения, поступающие на процессор, с постоянным ключом, чтобы сохранить общую агрегацию только на одной машине, но я думаю, что это потеряло бы параллелизм, который обеспечивает Kafka.

Кажется, это правильный подход. И да, параллелизм отсутствует, но именно так работает глобальная агрегация. В конце концов, одна машина должна это вычислить ...

Что вы могли бы улучшить, так это использовать двухэтапный подход: то есть сначала агрегировать «случайными» ключами параллельно и использовать второй шаг только с одним ключом для «слияния» частичных агрегатов в один. Таким образом, некоторые части вычислений распараллеливаются, и только последний шаг (при желательно уменьшенной загрузке данных) будет непараллельным. Используя Kafka Streams, вам нужно реализовать этот подход «вручную».

person Matthias J. Sax    schedule 12.09.2018
comment
Итак, мы должны развернуть процессор редуктора как отдельный модуль и не масштабировать его? - person talhaocakci; 13.09.2018
comment
У вас будет два редуктора. Вы по-прежнему можете развернуть оба в одном приложении - если вы установите одноэлементный ключ после первого сокращения, все данные в любом случае пойдут в один экземпляр второго редуктора. - person Matthias J. Sax; 13.09.2018