Spring Cloud Streams не устанавливает ключ kafka в сообщении?

История такая. У меня есть брокер kafka и конкретный объект (который я jsonify для отправки через мои темы), у которого есть идентификатор, который я хочу использовать в качестве ключа.

В настоящее время я использую конфигурацию partitionKeyExtractorClass, чтобы установить класс, который извлекает идентификатор и возвращает его в качестве ключа.

Это выглядит так:

def extractKey(Message<?> message) {
    log.info('Extracting key from message')
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
    log.info("Got = ${id}")

    return id
}

Моя настоящая проблема заключается в том, что когда я просматриваю сообщение по теме, ConsumerRecord, который содержит мое сообщение, говорит, что ключ равен нулю ...

Это ошибка? Я делаю что-то неправильно? Документация по этому поводу не идет дальше этого.


person Murasame    schedule 10.11.2016    source источник


Ответы (1)


Послушайте, вы смешиваете partition с key.

В настоящее время KafkaMessageChannelBinder не предоставляет возможности определить key против Message.

Только существующие функциональные возможности, которые вы можете эффективно использовать, - это KafkaHeaders.MESSAGE_KEY:

    Object messageKey = this.messageKeyExpression != null
            ? this.messageKeyExpression.getValue(this.evaluationContext, message)
            : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);

Итак, перед сообщением output вы должны вычислить ключ и поместить его в этот заголовок.

person Artem Bilan    schedule 10.11.2016
comment
Это неприемлемое решение. У меня такая же проблема. Я использую вывод в необработанном режиме. Мне не нужны заголовки в своих сообщениях, но ключ нужно указать. Я не хочу переходить на apache camel. - person George Smith; 13.11.2016
comment
Ваше беспокойство неясно. Это ограничение текущей реализации Kafka Binder, где mesageKey просто упущено. Не стесняйтесь заполнить вопрос. Вы можете просто настроить Kafka Binder, чтобы он пока не отображал этот заголовок, если вам это очень важно. И я совершенно не понимаю, как Apache Camel может заменить Spring Cloud Stream ... - person Artem Bilan; 13.11.2016
comment
Проблема в том, что я интегрирую потоки с другими производителями и потребителями, помимо Spring Cloud Streams. Ключ сообщения важен для обеспечения порядка в теме кафки. Наличие управляющих заголовков Spring Cloud Streams означает, что все другие производители и потребители должны иметь дело с конкретными заголовками Spring Cloud Streams. Мне нужно, чтобы сообщение kafka было именно таким, каким я хочу его видеть, а не тем, чего хочет Spring Cloud Streams. Мне нужен простой ключ и строковое сообщение. сырой режим дает мне это, но нет ключа для поддержания порядка. - person George Smith; 15.11.2016
comment
И Apache Camel имеет точно такие же возможности; для чтения потока сообщений, их обработки и маршрутизации / создания новых сообщений из обработанных данных. - person George Smith; 15.11.2016