Spring Cloud Streams tidak menyetel kunci kafka di pesan?

Ceritanya seperti ini. Saya memiliki broker kafka dan objek tertentu (yang saya jsonify untuk dikirim melalui topik saya) yang memiliki ID yang ingin saya gunakan sebagai kunci.

Saat ini saya menggunakan konfigurasi 'partitionKeyExtractorClass' untuk mengatur kelas yang mengekstrak ID dan mengembalikannya sebagai kunci.

Ini terlihat seperti ini:

def extractKey(Message<?> message) {
    log.info('Extracting key from message')
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
    log.info("Got = ${id}")

    return id
}

Masalah saya yang sebenarnya adalah ketika saya menelusuri pesan tentang topik ConsumerRecord yang menyimpan pesan saya mengatakan bahwa kuncinya adalah null...

Apakah ini bug? Apakah saya melakukan sesuatu yang salah? Dokumentasi mengenai hal ini tidak lebih dari ini.


person Murasame    schedule 10.11.2016    source sumber


Jawaban (1)


Lihat, Anda mencampur partition dengan key.

Saat ini KafkaMessageChannelBinder tidak memberikan opsi untuk menentukan key melawan Message.

Hanya fungsi yang ada yang dapat Anda gunakan dengan kuat adalah KafkaHeaders.MESSAGE_KEY:

    Object messageKey = this.messageKeyExpression != null
            ? this.messageKeyExpression.getValue(this.evaluationContext, message)
            : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);

Jadi, sebelum output pesan Anda harus menghitung kuncinya dan menempatkannya di header itu.

person Artem Bilan    schedule 10.11.2016
comment
Ini bukanlah solusi yang bisa diterima. Saya memiliki masalah yang sama. Saya menggunakan keluaran mode mentah. Saya tidak ingin ada header di pesan saya, tetapi kuncinya perlu disetel. Saya tidak mau harus beralih ke Apache camel. - person George Smith; 13.11.2016
comment
Kekhawatiran Anda tidak jelas. Itu adalah batasan implementasi Kafka Binder saat ini di mana mesageKey hanyalah sebuah kelalaian. Namun, jangan ragu untuk mengisi masalahnya. Anda cukup mengonfigurasi Kafka Binder, jangan memetakan header itu untuk saat ini jika Anda sangat memedulikannya. Dan saya benar-benar tidak mengerti bagaimana Apache Camel dapat menggantikan Spring Cloud Stream... - person Artem Bilan; 13.11.2016
comment
Masalahnya adalah saya mengintegrasikan streaming dengan produsen dan konsumen lain selain Spring Cloud Streams. Kunci pesan penting untuk menjamin ketertiban pada topik kafka. Memiliki header kontrol Spring Cloud Streams berarti semua produsen dan konsumen lainnya harus berurusan dengan header khusus Spring Cloud Streams. Saya ingin pesan kafka sesuai dengan yang saya inginkan, bukan seperti yang diinginkan Spring Cloud Streams. Kunci sederhana dan pesan String adalah yang saya butuhkan. mode mentah memberi saya itu, tetapi tidak ada kunci untuk menjaga ketertiban. - person George Smith; 15.11.2016
comment
Dan Apache Camel memiliki kemampuan yang persis sama; untuk membaca aliran pesan, memprosesnya dengan cara tertentu, dan merutekan/menghasilkan pesan baru dari data yang diproses. - person George Smith; 15.11.2016