Quarkus Kafka - ผู้ใช้ข้อความแบบกลุ่ม/จำนวนมาก

ฉันต้องการประมวลผลเป็นชุด ในกรณีการใช้งานของฉัน ข้อความ send kafka Producer จะถูกส่งไปทีละข้อความ ฉันต้องการอ่านเป็นรายการในแอปพลิเคชันสำหรับผู้บริโภค ฉันสามารถทำได้ที่ห้องสมุด Spring Kafka ผู้ฟังแบบแบตช์ Spring Kafka

มีวิธีใดบ้างในการทำเช่นนี้กับไลบรารี quarkus-smallrye-reactive-messaging-kafka

ฉันลองตัวอย่างด้านล่างแล้ว แต่พบข้อผิดพลาด

ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-3) SRMSG00200: The method org.MyConsumer#aggregate has thrown an exception: java.lang.ClassCastException: class org.TestConsumer cannot be cast to class io.smallrye.mutiny.Multi (org.TestConsumer is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @6f2c0754; io.smallrye.mutiny.Multi is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4c1638b)

ใบสมัครคุณสมบัติ:

kafka.bootstrap.servers=hosts
mp.messaging.connector.smallrye-kafka.group.id=KafkaQuick
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.incoming.test-consumer.connector=smallrye-kafka
mp.messaging.incoming.test-consumer.value.deserializer=org.TestConsumerDeserializer

ทดสอบConsumerDeserializer:

public class TestConsumerDeserializer extends JsonbDeserializer<TestConsumer>{
    public TestConsumerDeserializer(){
         // pass the class to the parent.
         super(TestConsumer.class);
    }
}  

ผู้บริโภคของฉัน:

@ApplicationScoped
public class MyConsumer {
    
    @Incoming("test-consumer")
    //@Outgoing("aggregated-channel")
    public void aggregate(Multi<Message<TestConsumer>> in) {
        System.out.println(in);
    }
}

person Süleyman Can    schedule 19.01.2021    source แหล่งที่มา
comment
คุณสามารถรวบรวมข้อมูลด้วยตนเองไปยังคิว จากนั้นประมวลผล/ล้างข้อมูลเมื่อถึงขนาดที่กำหนด   -  person OneCricketeer    schedule 20.01.2021


คำตอบ (1)


ฉันไม่เข้าใจเหตุผลว่าทำไม ClassNotFoundException ในคำถาม แต่ฉันพบวิธีแก้ปัญหาสำหรับการอ่านข้อความจำนวนมาก/บัคโดยใช้ quarkus-smallrye-reactive-messaging-kafka

โซลูชันที่ 1:

@Incoming("test-consumer-topic")
@Outgoing("aggregated-channel")
public Multi<List<TestConsumer>> aggregate(Multi<TestConsumer> in) {
     return in.groupItems().intoLists().every(Duration.ofSeconds(5));
}

@Incoming("aggregated-channel")
public void test(List<TestConsumer> test) {
   System.out.println("size: "+ test.size());
}

โซลูชันที่ 2:

@Incoming("test-consumer-topic")
@Outgoing("events-persisted")
public Multi<Message<TestConsumer>> processPayloadStream(Multi<Message<TestConsumer>> messages) {
    return messages
                    .groupItems().intoLists().of(4)
                    .emitOn(Infrastructure.getDefaultWorkerPool())
                    .flatMap(messages1 -> {
                        persist(messages1);
                        return Multi.createFrom().items(messages1.stream());
                    }).emitOn(Infrastructure.getDefaultExecutor());
}

public void persist(List<Message<TestConsumer>> messages){
    System.out.println("messages size:"+ messages.size());
}

@Incoming("events-persisted")
public CompletionStage<Void> messageAcknowledging(Message<TestConsumer> message){
    return message.ack();
}

หมายเหตุ: การใช้การกำหนดค่า application.properties ในคำถาม

อ้างอิง:

สนับสนุนการสมัครรับข้อมูลด้วย Multi‹Message‹››...

รับข้อความที่สำรวจความคิดเห็นจำนวนมากจาก kafka

person Süleyman Can    schedule 20.01.2021