Cara mengatur tipe konten avro pada produser kafka dengan spring boot dan spring cloud stream

Saya mencoba membuat contoh kerja dengan produser pesan avro, kafka dan konsumen avro menggunakan spring boot, spring cloud stream Schema registry, kafka binder. Kesalahan berikut terjadi saat menggunakan pesan:

ERROR 7059 --- [afka-listener-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Tidak dapat melakukan deserialize [io.igx.android.Sensor] menggunakan contentType [application/x-java-object;type=io.igx.android.Sensor] java .lang.NullPointerException

Saya memiliki spring.cloud.stream.bindings.output.contentType=application/+avro di sisi konsumen dan spring.cloud.stream.bindings.output.contentType=application/+avro di sisi produser.

Mengapa konsumen tidak melihat dan menggunakan tipe konten avro sebagai gantinya application/x-java-object.


person Todor Susnjevic    schedule 09.03.2017    source sumber


Jawaban (2)


Saya rasa Anda memerlukan tipe konten application/*+avro di sisi produser. (perhatikan awalan *). Setelah diatur untuk produsen, Dapatkah Anda mengonfirmasi apakah Anda masih perlu menentukan input ContentType untuk konsumen. Saya rasa konsumen dapat menyimpulkannya berdasarkan jenis konten yang ditetapkan oleh produsen.

person Ilayaperumal Gopinathan    schedule 09.03.2017
comment
Saya sekarang memiliki spring.cloud.stream.bindings.output.contentType=application/*+avro di sisi produsen, dan spring.cloud.stream.bindings.input.contentType=application/*+avro di sisi konsumen. Kesalahan masih sama: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException - person Todor Susnjevic; 09.03.2017
comment
...dan jika tidak ada array dalam skema avro saya dapat menggunakan pesan secara normal. - person Todor Susnjevic; 09.03.2017
comment
Apa yang terjadi jika Anda menyetel jenis konten sebagai spring.cloud.stream.bindings.output.contentType=avro/bytes ? - person Ilayaperumal Gopinathan; 09.03.2017

Nama saluran salah. Jadi masalahnya adalah saya menggunakan spring.cloud.stream.bindings.input.contentType=application/*‌​+avro

alih-alih ini

spring.cloud.stream.bindings.{NAMA SALURAN SAYA}.contentType=application/*‌​+avro.

Ketika saya mengubah {MY CHANNEL NAME} menjadi nama yang saya gunakan, keajaiban terjadi.

Ia berfungsi meskipun saya tidak menentukan tipe konten di sisi konsumen.

Selain itu, saya perlu menentukan spring.cloud.stream.schemaRegistryClient.endpoint jika ini bukan default, seperti dalam kasus saya, baik di sisi produsen maupun konsumen.

person Todor Susnjevic    schedule 09.03.2017