Apakah Spring Cloud Stream Kafka mendukung header yang disematkan?

Menurut topik ini:
Integrasi Kafka Spring: Header tidak hadir untuk konsumen kafka - ini bukan dukungan header untuk Kafka

Namun dokumentasi mengatakan:

spring.cloud.stream.kafka.binder.headers
Daftar header khusus yang akan dipindahkan oleh binder.

Bawaan: kosong.

Saya tidak bisa menjalankannya dengan spring-cloud-stream-binder-kafka: 1.2.0.RELEASE

PENGIRIMAN LOG:

MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT : 
GenericMessage [payload=..., 
headers={
   content-type=application/json, 
   correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383, 
   id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f, 
   contentType=application/json, 
   timestamp=1497535771673
}]

MENERIMA LOG:

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1: 
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

Saya berharap melihat id pesan yang sama dan mendapatkan correlationId di pihak penerima.

aplikasi.properti:

spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders

MENGIRIM PESAN:

@Publisher(channel = "testChannel")
public Object newTest(Object param) {
    ...
    return myObject;
}

person S2201    schedule 15.06.2017    source sumber
comment
Bisakah Anda memberikan contoh apa yang Anda coba lakukan?   -  person Marius Bogoevici    schedule 15.06.2017
comment
@MariusBogoevici Silakan lihat pembaruan postingan...   -  person S2201    schedule 15.06.2017


Jawaban (1)


Ya, benar: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties

mode header

Jika disetel ke mentah, nonaktifkan penguraian header pada input. Hanya efektif untuk middleware perpesanan yang tidak mendukung header pesan secara asli dan memerlukan penyematan header. Berguna ketika data masuk berasal dari luar aplikasi Spring Cloud Stream.

Bawaan: tertanamHeader

Tapi itu sudah menjadi cerita Spring Cloud Stream, bukan Spring Kafka itu sendiri.

person Artem Bilan    schedule 15.06.2017
comment
Tidak, namun secara default hanya sebagian header yang dipindahkan - lihat github.com/spring-cloud/spring-cloud-stream/blob/master/. Jika Anda ingin memindahkan header Anda sendiri, Anda dapat menambahkan namanya ke properti yang disediakan Artem. - person Marius Bogoevici; 15.06.2017
comment
Terima kasih atas balasan Anda! Tapi ini sebenarnya pertanyaannya - ini seharusnya berfungsi sesuai dengan dokumentasi, tetapi ternyata tidak. Silakan lihat pembaruannya, saya telah menambahkan log dan konfigurasinya. - person S2201; 15.06.2017
comment
Lihat pembaruan saya untuk pertanyaan lain yang Anda berkomentar - Saya menguji sampel itu dengan Dalston.SR1 (1.2.1) dan berfungsi dengan baik. - person Gary Russell; 15.06.2017
comment
Proyek yang diperbarui ada di repo sandbox saya. - person Gary Russell; 15.06.2017
comment
Sekali lagi terima kasih kawan. Masalahnya ada pada pengiriman pesan (ditambahkan ke pertanyaan). Pesan yang dicatat bukan pesan yang dikirim :-) Sekarang saya bisa meneruskan header khusus. Haruskah ID pesan sama di sisi produser dan penerima? - person S2201; 15.06.2017
comment
Id pesan harus diperlakukan sebagai properti internal (pesan mungkin mengalami transformasi internal antara Integrasi Spring, Spring Kafka, Spring Cloud Stream). Idealnya Anda harus menggunakan header khusus Anda sendiri untuk menyampaikan identitas pesan. - person Marius Bogoevici; 15.06.2017
comment
Ya terima kasih. Saya punya satu pertanyaan lagi berikutnya, tapi saya mempostingnya di utas terpisah - person S2201; 15.06.2017