Topologi Aliran Kafka pada banyak contoh

Kami memiliki topologi aliran yang akan bekerja pada banyak mesin. Kami menyimpan hasil agregasi jangka waktu ke penyimpanan negara. Karena penyimpanan negara bagian menyimpan data lokal, menurut saya agregasi harus dilakukan pada topik lain untuk agregasi keseluruhan. Namun sepertinya saya melewatkan sesuatu karena tidak ada satu pun contoh yang melakukan agregasi keseluruhan pada KStream atau Prosesor lain.

Apakah kita perlu menggunakan logika groupBy untuk menyimpan keseluruhan agregasi, atau menggunakan GlobalKtable atau hanya mengimplementasikan kode merger kita sendiri di suatu tempat?

Apa arsitektur yang benar untuk ini?

Dalam kode di bawah ini, saya telah mencoba mengelompokkan semua pesan yang masuk ke prosesor dengan kunci konstan untuk menyimpan keseluruhan agregasi hanya pada satu mesin, tetapi menurut saya paralelisme yang disediakan Kafka akan hilang.

dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
        .filter((key, event) -> event != null && event.getClientCreationDate() != null);

 dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
       .groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
       .windowedBy(timeWindow)
       .count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));

person talhaocakci    schedule 11.09.2018    source sumber
comment
Segera setelah Anda perlu mengelompokkan kunci, Anda secara efektif menyalurkan semua data ke satu prosesor, apa pun kerangka kerja yang digunakan (misalnya, jika Anda melakukan hal yang sama di Spark). Tidak begitu jelas mengapa Anda perlu mengelompokkan semuanya dan memasukkan kembali kuncinya   -  person OneCricketeer    schedule 12.09.2018
comment
Jadi menurut Anda, kita tidak perlu menggabungkan secara manual hasil yang berasal dari berbagai contoh topologi yang sama?   -  person talhaocakci    schedule 12.09.2018
comment
Tidak jelas apa yang dimaksud dengan data apa yang Anda peroleh atau harapkan sebagai keluaran. Namun, jika Anda ingin sesuatu diparalelkan, idealnya itu bukan contoh jumlah kata yang hanya menghitung kata. Misalnya, Anda mungkin ingin menghitung dengan event.getName() atau event.getClientUuid(), atau serupa, tapi seperti saya katakan, saya tidak tahu data Anda   -  person OneCricketeer    schedule 12.09.2018


Jawaban (1)


Dalam kode di bawah ini, saya telah mencoba mengelompokkan semua pesan yang masuk ke prosesor dengan kunci konstan untuk menyimpan keseluruhan agregasi hanya pada satu mesin, tetapi menurut saya paralelisme yang disediakan Kafka akan hilang.

Tampaknya ini merupakan pendekatan yang tepat. Dan ya, Anda kehilangan paralelisme, namun begitulah cara kerja agregasi global. Pada akhirnya, satu mesin harus menghitungnya...

Apa yang dapat Anda tingkatkan adalah dengan melakukan pendekatan dua langkah: yaitu, agregat pertama dengan kunci "acak" secara paralel, dan gunakan langkah kedua dengan hanya satu kunci untuk "menggabungkan" sebagian agregat menjadi satu. Dengan cara ini, beberapa bagian komputasi diparalelkan dan hanya langkah terakhir (yang diharapkan dapat mengurangi beban data) yang bersifat non-paralel. Dengan menggunakan Kafka Streams, Anda perlu menerapkan pendekatan ini "secara manual".

person Matthias J. Sax    schedule 12.09.2018
comment
Jadi, kita harus menerapkan prosesor peredam sebagai modul terpisah dan tidak menskalakannya? - person talhaocakci; 13.09.2018
comment
Anda akan memiliki dua reduksi. Anda masih dapat menerapkan keduanya dalam satu aplikasi -- jika Anda menyetel kunci tunggal setelah pengurangan pertama, semua data akan tetap masuk ke satu instance dari peredam kedua. - person Matthias J. Sax; 13.09.2018