วิธีรับทราบค่าชดเชยปัจจุบันใน spring kafka สำหรับการคอมมิตแบบแมนนวล

ฉันใช้ Spring Kafka ครั้งแรกและฉันไม่สามารถใช้เมธอด Acknowledgement.acknowledge() สำหรับการคอมมิตด้วยตนเองในโค้ดผู้บริโภคของฉันได้ โปรดแจ้งให้เราทราบหากมีสิ่งใดขาดหายไปในการกำหนดค่าผู้บริโภคหรือรหัสผู้ฟังของฉัน หรือมีวิธีอื่นในการจัดการการตอบรับการชดเชยตามเงื่อนไข ที่นี่ฉันกำลังดูวิธีแก้ปัญหา เช่น หากออฟเซ็ตไม่ได้ถูกคอมมิต/ รับทราบด้วยตนเอง ก็ควรเลือกข้อความ/ออฟเซ็ตเดียวกันโดยผู้บริโภค

การกำหนดค่า

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
                props));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
}

ผู้ฟัง

private static int value = 1;

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    }
    value++;
}

person user2550140    schedule 22.11.2017    source แหล่งที่มา
comment
จะเกิดอะไรขึ้นถ้าเราเปลี่ยนการตั้งค่าจาก props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) เป็น props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) ? รหัสอื่นใดจำเป็นต้องเปลี่ยนแปลงหรือไม่หรือสปริงจะเพิกเฉยต่อ acknowledgment.acknowledge()   -  person alex    schedule 21.05.2020


คำตอบ (4)


ตั้งค่าคุณสมบัติเปิดใช้งานอัตโนมัติกระทำเป็นเท็จ:

propsMap.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, เท็จ);

ตั้งค่าโหมด ack เป็น MANUAL_IMMEDIATE:

โรงงาน.getContainerProperties().setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

จากนั้น ในโค้ด Consumer/Listener คุณสามารถคอมมิตออฟเซ็ตด้วยตนเองได้ เช่นนี้

@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    System.out.println("Received message: ");
    System.out.println(consumerRecord.value().toString());

    acknowledgment.acknowledge();
}

อัปเดต: ฉันสร้าง POC ขนาดเล็กสำหรับสิ่งนี้ ลองดูที่นี่ อาจช่วยคุณได้

person contactsunny    schedule 22.02.2018
comment
สำหรับ Spring 2.3.3 การกำหนดค่าคือ Factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); - person Christian Altamirano Ayala; 02.12.2019
comment
ฉันกำลังพยายามที่จะส่งบันทึกเดียว แต่น่าเสียดายที่ฉันกำลังส่งบันทึกก่อนหน้าหรือบันทึกถัดไป มีเพียงบันทึกเดียวที่ไม่สามารถทำได้ คุณมีความคิดไหมว่าจะต้องทำอย่างไร - person Tiago Medici; 09.01.2020

คุณต้องทำสิ่งต่อไปนี้

1) ตั้งค่าคุณสมบัติเปิดใช้งานอัตโนมัติกระทำเป็นเท็จ

consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

2) ตั้งค่าโหมด ACK เป็น MANUL_IMMEDIATE

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

3) สำหรับบันทึกที่ประมวลผลแล้ว คุณต้องเรียก acknowledgment.acknowledge();

4) สำหรับบันทึกที่ล้มเหลวให้เรียกการรับทราบ.nack(10); หมายเหตุ: วิธีการ nack ใช้พารามิเตอร์ที่ยาวซึ่งเป็นเวลาพักเครื่องและควรน้อยกว่า max.poll.interval.ms

ด้านล่างนี้เป็นโค้ดตัวอย่าง

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    } else {
        acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
    }
    value++;
}
person Amit Nargund    schedule 13.02.2020

คุณสามารถทำสิ่งต่อไปนี้:
1. เก็บออฟเซ็ตบันทึกปัจจุบันไปยังไฟล์หรือ DB
2. นำคลาสตัวฟัง kafka ของคุณไปใช้ด้วย ConsumerAware
3. โทร registerSeekCallback ตามที่ระบุด้านล่าง:

(registerSeekCallback(ConsumerSeekCallback callback) 
      {
      callback.seek(topic, partition, offset)
}

ดังนั้นเมื่อ Consumer หยุดทำงานหรือกำหนด Consumer ใหม่ มันจะเริ่มอ่านจากออฟเซ็ตที่จัดเก็บไว้ในฐานข้อมูลของคุณ

person Zubair A.    schedule 18.09.2018
comment
registerSeekCallback (การโทรกลับ ConsumerSeekCallback) { callback.seek (หัวข้อ, พาร์ติชัน, ออฟเซ็ต) } - person Zubair A.; 18.09.2018

นั่นใช้ไม่ได้ผลใน Apache Kafka

สำหรับผู้บริโภคที่ใช้งานอยู่ในปัจจุบัน เราอาจไม่ต้องกังวลกับการดำเนินการชดเชย เราต้องการสิ่งเหล่านี้สำหรับผู้บริโภคใหม่ในกลุ่มผู้บริโภคเดียวกันเท่านั้น ปัจจุบันติดตามการชดเชยในหน่วยความจำ ฉันเดาว่าอยู่ที่ไหนสักแห่งใน Broker

หากคุณต้องการดึงข้อความเดิมซ้ำในกลุ่มผู้บริโภครายเดียวกันในรอบการสำรวจความคิดเห็นครั้งถัดไป คุณควรพิจารณาใช้ฟังก์ชัน seek(): https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

person Artem Bilan    schedule 22.11.2017