Bagaimana cara mengetahui offset saat ini di spring kafka untuk penerapan manual

Saya menggunakan Spring Kafka pertama kali dan saya tidak dapat menggunakan metode Acknowledgement.acknowledge() untuk penerapan manual dalam kode konsumen saya. tolong beri tahu saya jika ada yang hilang dalam konfigurasi konsumen atau kode pendengar saya. atau apakah ada cara lain untuk menangani offset pengakuan berdasarkan kondisi. Di sini saya mencari solusi seperti jika offset tidak dilakukan/diakui secara manual, ia harus memilih pesan/offset yang sama oleh konsumen.

Konfigurasi

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
                props));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
}

Pendengar

private static int value = 1;

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    }
    value++;
}

person user2550140    schedule 22.11.2017    source sumber
comment
Apa yang terjadi jika kita mengubah pengaturan dari props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) ke props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) ? Apakah ada kode lain yang perlu diubah atau akankah pegas mengabaikan acknowledgment.acknowledge()?   -  person alex    schedule 21.05.2020


Jawaban (4)


Setel properti aktifkan-komit otomatis ke salah:

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Setel mode ack ke MANUAL_IMMEDIATE:

pabrik.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

Kemudian, dalam kode konsumen/pendengar, Anda dapat melakukan offset secara manual, seperti ini:

@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    System.out.println("Received message: ");
    System.out.println(consumerRecord.value().toString());

    acknowledgment.acknowledge();
}

Pembaruan: Saya membuat POC kecil untuk ini. Lihat di sini, mungkin bisa membantu Anda.

person contactsunny    schedule 22.02.2018
comment
Untuk Spring 2.3.3 konfigurasinya adalah factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); - person Christian Altamirano Ayala; 02.12.2019
comment
Saya mencoba melakukan satu catatan dan sayangnya melakukan catatan sebelumnya atau catatan berikutnya, hanya satu catatan saja yang tidak berhasil, apakah Anda tahu pendekatan apa yang harus diambil? - person Tiago Medici; 09.01.2020

Anda perlu melakukan hal berikut

1) Setel properti aktifkan-komit otomatis ke salah

consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

2) Atur Mode ACK ke MANUL_IMMEDIATE

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

3) Untuk catatan yang diproses, Anda perlu memanggil acknowledgment.acknowledge();

4) untuk catatan yang gagal, panggil acknowledgment.nack(10); Catatan: metode nack membutuhkan parameter panjang yaitu waktu tidur dan harus kurang dari max.poll.interval.ms

Di bawah ini adalah contoh kode

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    } else {
        acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
    }
    value++;
}
person Amit Nargund    schedule 13.02.2020

Anda dapat melakukan hal berikut:
1. menyimpan offset rekaman saat ini ke file atau DB.
2. Implementasikan kelas pendengar kafka Anda dengan ConsumerAware.
3. Panggil registerSeekCallback seperti yang diberikan di bawah ini:

(registerSeekCallback(ConsumerSeekCallback callback) 
      {
      callback.seek(topic, partition, offset)
}

Jadi ketika konsumen turun atau konsumen baru ditugaskan, ia mulai membaca dari offset yang disimpan di DB Anda.

person Zubair A.    schedule 18.09.2018
comment
registerSeekCallback(callback ConsumerSeekCallback){ callback.seek(topik, partisi, offset) } - person Zubair A.; 18.09.2018

Hal ini tidak berlaku di Apache Kafka.

Untuk konsumen yang sedang berjalan, kami mungkin tidak perlu khawatir untuk melakukan offset. Kami membutuhkannya hanya untuk konsumen baru dalam kelompok konsumen yang sama. Yang sekarang melacak offsetnya di memori. Saya kira di suatu tempat di Broker.

Jika Anda perlu mengambil kembali pesan yang sama pada konsumen yang sama mungkin pada putaran jajak pendapat berikutnya, Anda harus mempertimbangkan untuk menggunakan fungsi seek(): https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

person Artem Bilan    schedule 22.11.2017