Поддерживает ли Spring Cloud Stream Kafka встроенные заголовки?

Согласно этой теме:
Интеграция Kafka Spring: заголовки не идет для потребителя kafka - это не поддержка заголовков для Kafka

Но документация говорит:

spring.cloud.stream.kafka.binder.headers
Список настраиваемых заголовков, которые будут переноситься подшивкой.

По умолчанию: пусто.

Я не могу заставить его работать с spring-cloud-stream-binder-kafka: 1.2.0.RELEASE

ОТПРАВКА ЖУРНАЛА:

MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT : 
GenericMessage [payload=..., 
headers={
   content-type=application/json, 
   correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383, 
   id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f, 
   contentType=application/json, 
   timestamp=1497535771673
}]

ПОЛУЧЕНИЕ ЖУРНАЛА:

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1: 
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=..., 
headers={
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
}]

Я ожидаю увидеть то же сообщение id и получить correlationId на принимающей стороне.

application.properties:

spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders

ОТПРАВКА СООБЩЕНИЯ:

@Publisher(channel = "testChannel")
public Object newTest(Object param) {
    ...
    return myObject;
}

person S2201    schedule 15.06.2017    source источник
comment
Можете ли вы привести пример того, что вы пытаетесь сделать?   -  person Marius Bogoevici    schedule 15.06.2017
comment
@MariusBogoevici Пожалуйста, посмотрите сообщение об обновлении ...   -  person S2201    schedule 15.06.2017


Ответы (1)


Да, есть: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties

headerMode

Если установлено значение raw, отключает синтаксический анализ заголовка при вводе. Действует только для промежуточного программного обеспечения обмена сообщениями, которое изначально не поддерживает заголовки сообщений и требует встраивания заголовков. Полезно, когда входящие данные поступают из-за пределов приложений Spring Cloud Stream.

По умолчанию: embeddedHeaders

Но это уже история Spring Cloud Stream, а не Spring Kafka как таковая.

person Artem Bilan    schedule 15.06.2017
comment
Нет, но по умолчанию переносится только подмножество заголовков - см. github.com/spring-cloud/spring-cloud-stream/blob/master/. Если вы хотите перенести свои собственные заголовки, вы можете добавить их имена в свойство, которое предоставил Артем. - person Marius Bogoevici; 15.06.2017
comment
Спасибо за ответы! Но вот собственно вопрос - должно работать по документации, а это не так. Пожалуйста, смотрите обновление, я добавил лог и конфигурацию. - person S2201; 15.06.2017
comment
Смотрите мое обновление другой вопрос, который вы прокомментировал - я тестировал этот образец с Dalston.SR1 (1.2.1), и он отлично работает. - person Gary Russell; 15.06.2017
comment
Обновленный проект находится в моем репозитории песочницы. - person Gary Russell; 15.06.2017
comment
Еще раз спасибо, ребята. Проблема была в отправке сообщения (добавлено в вопрос). Зарегистрированное сообщение не было отправленным :-) Теперь я могу передать настраиваемые заголовки. Должен ли идентификатор сообщения быть одинаковым на стороне производителя и на стороне получателя? - person S2201; 15.06.2017
comment
Идентификатор сообщения следует рассматривать как внутреннее свойство (сообщения могут подвергаться внутренним преобразованиям между Spring Integration, Spring Kafka, Spring Cloud Stream). В идеале вы должны использовать собственный настраиваемый заголовок для передачи идентичности сообщения. - person Marius Bogoevici; 15.06.2017
comment
Да, спасибо. У меня есть еще один следующий вопрос, но я разместил его в отдельной теме - person S2201; 15.06.2017