Как подтвердить текущее смещение в Spring Kafka для ручной фиксации

Я впервые использую Spring Kafka, и я не могу использовать метод Acknowledgement.acknowledge () для ручной фиксации в моем пользовательском коде. пожалуйста, дайте мне знать, если что-то отсутствует в моей конфигурации потребителя или коде слушателя. или есть другой способ обработки смещения подтверждения на основе условия. Здесь я ищу решение, например, если смещение не зафиксировано / не подтверждено вручную, оно должно выбрать то же сообщение / смещение потребителем.

Конфигурация

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
                props));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
}

Слушатель

private static int value = 1;

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    }
    value++;
}

person user2550140    schedule 22.11.2017    source источник
comment
Что произойдет, если мы затем переключим настройку с props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) на props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)? Нужно ли менять какой-либо другой код, или пружина проигнорирует acknowledgment.acknowledge()?   -  person alex    schedule 21.05.2020


Ответы (4)


Установите для свойства enable-auto-commit значение false:

propsMap.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ложь);

Установите режим подтверждения на MANUAL_IMMEDIATE:

factory.getContainerProperties (). setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

Затем в коде потребителя / слушателя вы можете зафиксировать смещение вручную, например:

@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    System.out.println("Received message: ");
    System.out.println(consumerRecord.value().toString());

    acknowledgment.acknowledge();
}

Обновление: я создал для этого небольшой POC. Проверьте это здесь, может вам помочь.

person contactsunny    schedule 22.02.2018
comment
Для Spring 2.3.3 конфигурация - factory.getContainerProperties (). SetAckMode (ContainerProperties.AckMode.MANUAL_IMMEDIATE); - person Christian Altamirano Ayala; 02.12.2019
comment
Я пытаюсь зафиксировать одну запись и, к сожалению, фиксирую предыдущие или следующие записи, только одна запись не работает, у вас есть идея, какой подход выбрать? - person Tiago Medici; 09.01.2020

Вам необходимо сделать следующее

1) Установите для свойства enable-auto-commit значение false

consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

2) Установите режим ACK на MANUL_IMMEDIATE.

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

3) Для обработанных записей необходимо вызвать подтверждение.acknowledge ();

4) при неудачных записях вызовите подтверждение.nack (10); Примечание: метод nack принимает длинный параметр, который является временем сна, и он должен быть меньше max.poll.interval.ms.

Ниже приведен пример кода

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    } else {
        acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
    }
    value++;
}
person Amit Nargund    schedule 13.02.2020

Вы можете сделать следующее:
1. сохранить текущее смещение записи в файл или БД.
2. Реализовать свой класс слушателя kafka с помощью ConsumerAware.
3. Вызвать registerSeekCallback, как показано ниже:

(registerSeekCallback(ConsumerSeekCallback callback) 
      {
      callback.seek(topic, partition, offset)
}

Поэтому, когда потребитель отключается или назначается новый потребитель, он начинает читать смещение, хранящееся в вашей БД.

person Zubair A.    schedule 18.09.2018
comment
registerSeekCallback (обратный вызов ConsumerSeekCallback) {callback.seek (тема, раздел, смещение)} - person Zubair A.; 18.09.2018

В Apache Kafka это не работает.

Для текущего работающего потребителя мы можем никогда не беспокоиться о фиксации смещений. Они нужны нам только для новых потребителей в той же группе потребителей. Текущий отслеживает его смещение в памяти. Я думаю, где-то на Брокере.

Если вам нужно повторно получить то же сообщение у того же потребителя, возможно, в следующем раунде опроса, вам следует подумать об использовании seek() функциональности: https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

person Artem Bilan    schedule 22.11.2017