Quarkus Kafka - Konsumen pesan batch/massal

Saya ingin melakukan proses batch. Dalam kasus penggunaan saya, pesan produser kafka dikirim satu per satu. Saya ingin membacanya sebagai daftar di aplikasi konsumen. Saya bisa melakukannya di perpustakaan Spring Kafka. Pendengar batch Kafka musim semi

Apakah ada cara untuk melakukan ini dengan perpustakaan quarkus-smallrye-reactive-messaging-kafka?

Saya mencoba contoh di bawah ini tetapi mendapat kesalahan.

ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-3) SRMSG00200: The method org.MyConsumer#aggregate has thrown an exception: java.lang.ClassCastException: class org.TestConsumer cannot be cast to class io.smallrye.mutiny.Multi (org.TestConsumer is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @6f2c0754; io.smallrye.mutiny.Multi is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4c1638b)

aplikasi.properti:

kafka.bootstrap.servers=hosts
mp.messaging.connector.smallrye-kafka.group.id=KafkaQuick
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.incoming.test-consumer.connector=smallrye-kafka
mp.messaging.incoming.test-consumer.value.deserializer=org.TestConsumerDeserializer

Uji KonsumenDeserializer:

public class TestConsumerDeserializer extends JsonbDeserializer<TestConsumer>{
    public TestConsumerDeserializer(){
         // pass the class to the parent.
         super(TestConsumer.class);
    }
}  

Konsumen Saya:

@ApplicationScoped
public class MyConsumer {
    
    @Incoming("test-consumer")
    //@Outgoing("aggregated-channel")
    public void aggregate(Multi<Message<TestConsumer>> in) {
        System.out.println(in);
    }
}

person Süleyman Can    schedule 19.01.2021    source sumber
comment
Anda dapat mengumpulkan datanya sendiri ke dalam Antrean, lalu memproses/mengosongkannya ketika sudah mencapai ukuran yang ditentukan   -  person OneCricketeer    schedule 20.01.2021


Jawaban (1)


Saya tidak mengerti alasan mengapa ClassNotFoundException ada dalam pertanyaan. Tapi saya menemukan solusi untuk membaca pesan massal/bach menggunakan quarkus-smallrye-reactive-messaging-kafka.

Solusi 1:

@Incoming("test-consumer-topic")
@Outgoing("aggregated-channel")
public Multi<List<TestConsumer>> aggregate(Multi<TestConsumer> in) {
     return in.groupItems().intoLists().every(Duration.ofSeconds(5));
}

@Incoming("aggregated-channel")
public void test(List<TestConsumer> test) {
   System.out.println("size: "+ test.size());
}

Solusi 2:

@Incoming("test-consumer-topic")
@Outgoing("events-persisted")
public Multi<Message<TestConsumer>> processPayloadStream(Multi<Message<TestConsumer>> messages) {
    return messages
                    .groupItems().intoLists().of(4)
                    .emitOn(Infrastructure.getDefaultWorkerPool())
                    .flatMap(messages1 -> {
                        persist(messages1);
                        return Multi.createFrom().items(messages1.stream());
                    }).emitOn(Infrastructure.getDefaultExecutor());
}

public void persist(List<Message<TestConsumer>> messages){
    System.out.println("messages size:"+ messages.size());
}

@Incoming("events-persisted")
public CompletionStage<Void> messageAcknowledging(Message<TestConsumer> message){
    return message.ack();
}

catatan: Menggunakan konfigurasi application.properties dalam pertanyaan.

referensi:

Dukungan berlangganan dengan Multi‹Pesan‹››...

Dapatkan pesan survei massal dari kafka

person Süleyman Can    schedule 20.01.2021