Как установить тип контента avro на продюсере kafka с помощью Spring Boot и Spring Cloud Stream

Я пытаюсь создать рабочий пример с производителем сообщений avro, kafka и потребителем avro, используя весеннюю загрузку, реестр схемы весеннего облачного потока, связыватель kafka. При использовании сообщений возникает следующая ошибка:

ОШИБКА 7059 --- [afka-listener-1] oscsbkKafkaMessageChannelBinder: невозможно десериализовать [io.igx.android.Sensor] с помощью contentType [application / x-java-object; type = io.igx.android.Sensor] java .lang.NullPointerException

У меня есть spring.cloud.stream.bindings.output.contentType = application / + avro на стороне потребителя и spring.cloud.stream.bindings.output.contentType = application / + avro на стороне производителя.

Почему потребитель не видит и не использует тип содержимого avro вместо application / x-java-object.


person Todor Susnjevic    schedule 09.03.2017    source источник


Ответы (2)


Я думаю, вам нужен тип контента application/*+avro на стороне производителя. (обратите внимание на префикс *). Как только это будет установлено для производителя, можете ли вы подтвердить, нужно ли вам по-прежнему указывать входной contentType для потребителя. Я думаю, что потребитель может сделать вывод, основываясь на типе контента, установленном производителем.

person Ilayaperumal Gopinathan    schedule 09.03.2017
comment
Теперь у меня есть spring.cloud.stream.bindings.output.contentType = application / * + avro на стороне производителя и spring.cloud.stream.bindings.input.contentType = application / * + avro на стороне потребителя. Все еще та же ошибка: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException - person Todor Susnjevic; 09.03.2017
comment
... и если в схеме avro нет массивов, я могу нормально использовать сообщение. - person Todor Susnjevic; 09.03.2017
comment
Что произойдет, если вы установите тип содержимого как spring.cloud.stream.bindings.output.contentType = avro / bytes? - person Ilayaperumal Gopinathan; 09.03.2017

Неправильное название канала. Итак, проблема заключалась в том, что я использовал spring.cloud.stream.bindings.input.contentType = application / * ‌ + avro

вместо этого

spring.cloud.stream.bindings. {МОЕ НАЗВАНИЕ КАНАЛА} .contentType = application / * ‌ + avro.

Когда я изменил {МОЕ ИМЯ КАНАЛА} на имя, которое использовал, произошло волшебство.

Это работает, даже если я не указал тип контента на стороне потребителя.

Кроме того, мне нужно указать spring.cloud.stream.schemaRegistryClient.endpoint, если он не по умолчанию, как в моем случае, как на стороне производителя, так и на стороне потребителя.

person Todor Susnjevic    schedule 09.03.2017