วิธีส่งคำขอ/ตอบกลับแบบอะซิงโครนัสผ่านการทำงานร่วมกันของ Channel Adapters โดยไม่มีการเชื่อมโยงข้อมูล

สรุป

ฉันใช้การสนับสนุน TCP & UDP ของ Spring Integration กับพร็อกซี TCP สตรีมการรับส่งข้อมูลผ่านแอปพลิเคชันของฉันไปยังเซิร์ฟเวอร์อัปสตรีม จากนั้นพร็อกซีการตอบสนองของเซิร์ฟเวอร์นั้นกลับผ่านแอปพลิเคชันของฉันไปยังไคลเอนต์ แม้ว่านี่จะเป็นการสื่อสารแบบสองทาง แต่ฉันต้องการทรูพุตแบบอะซิงโครนัสปริมาณมาก ดังนั้นฉันจึงใช้เกตเวย์ไม่ได้ ฉันกำลังพยายามใช้ Collaborating Outbound และ Inbound Channel Adapters ตามที่อธิบายไว้ในส่วน 34.8.2 แทน

การตั้งค่าส่วนประกอบการรวม

คำขอ

A TcpReceivingChannelAdapter รับคำขอผ่าน TcpNetServerConnectionFactory บนพอร์ต 6060 โดยจะวางคำขอเหล่านี้ในคำขอ QueueChannel คำขอจะถูกรับโดย TcpSendingMessageHandler ซึ่งส่งคำขอผ่านการเชื่อมต่อไคลเอนต์ที่สร้างโดย TcpNetClientConnectionFactory การเชื่อมต่อนี้ส่งคำขอออกจากแอปพลิเคชันของฉันและไปยังเซิร์ฟเวอร์อัปสตรีม

การตอบสนอง

A TcpReceivingChannelAdapter ได้รับการตอบกลับจากเซิร์ฟเวอร์อัปสตรีมผ่านการเชื่อมต่อ TcpNetClientConnectionFactory โดยจะวางการตอบกลับเหล่านี้ในการตอบกลับ QueueChannel การตอบกลับจะถูกรับโดย TcpSendingMessageHandler ซึ่งพยายามส่งการตอบกลับกลับไปยังไคลเอนต์ผ่านการเชื่อมต่อจาก TcpNetServerConnectionFactory ดั้งเดิม การเชื่อมต่อขั้นสุดท้ายคือสิ่งที่ล้มเหลว

    @Bean
    public PollableChannel requestChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public PollableChannel replyChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public TcpNetServerConnectionFactory serverFactory() {
        TcpNetServerConnectionFactory serverFactory = new TcpNetServerConnectionFactory(6060);
        serverFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setSingleUse(false);
        return serverFactory;
    }

    @Bean
    public TcpNetClientConnectionFactory clientFactory() {
        TcpNetClientConnectionFactory clientFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6080);
        clientFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setSingleUse(false);
        return clientFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundRequestAdapter() {
        TcpReceivingChannelAdapter inboundRequestAdapter = new TcpReceivingChannelAdapter();
        inboundRequestAdapter.setConnectionFactory(serverFactory());
        inboundRequestAdapter.setOutputChannel(requestChannel());
        return inboundRequestAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "requestChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundRequestAdapter() {
        TcpSendingMessageHandler outboundRequestAdapter = new TcpSendingMessageHandler();
        outboundRequestAdapter.setConnectionFactory(clientFactory());
        return outboundRequestAdapter;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundReplyAdapter() {
        TcpReceivingChannelAdapter inboundReplyAdapter = new TcpReceivingChannelAdapter();
        inboundReplyAdapter.setConnectionFactory(clientFactory());
        inboundReplyAdapter.setOutputChannel(replyChannel());
        return inboundReplyAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundReplyAdapter() {
        TcpSendingMessageHandler outboundReplyAdapter = new TcpSendingMessageHandler();
        outboundReplyAdapter.setConnectionFactory(serverFactory());
        return outboundReplyAdapter;
    }

ผลลัพธ์ที่แท้จริง

ข้อผิดพลาด:

Unable to find outbound socket for GenericMessage

การติดตามสแต็กเต็ม:

2019-02-01 14:10:55.315 ERROR 32553 --- [ask-scheduler-2] o.s.i.ip.tcp.TcpSendingMessageHandler    : Unable to find outbound socket for GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
2019-02-01 14:10:55.319 ERROR 32553 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, failedMessage=GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

นี่สมเหตุสมผลแล้ว ฉันทราบว่า TcpReceivingChannelAdapter ตั้งค่าฟิลด์ส่วนหัวของข้อความ ip_connectionId เมื่อส่งต่อข้อความ เนื่องจากฉันไม่มีความสัมพันธ์กันในขณะนี้ ส่วนหัว ID จากอะแดปเตอร์ขาเข้าตัวแรกจะหายไปเมื่อเพย์โหลดถูกพร็อกซีอัปสตรีม และอะแดปเตอร์ขาเข้าตัวที่สองจะสร้างส่วนหัว ID ใหม่

ด้วยเหตุนี้ เมื่อการตอบกลับกลับไปยังอะแดปเตอร์ขาออกสุดท้าย ส่วนหัวของ ID จะไม่ตรงกับสิ่งใดๆ ที่อะแดปเตอร์ขาเข้าที่เกี่ยวข้องทราบ ดังนั้นจึงไม่รู้ว่าจะใช้การเชื่อมต่อใดในการส่งคำตอบ

คำถามของฉันคือ: มีวิธีใดในการตั้งค่าการเชื่อมต่อ "เริ่มต้น" หรือเพิ่มเพย์โหลดด้วยข้อมูลที่สัมพันธ์กันโดยไม่ต้องส่งอัปสตรีมนั้น

ปัญหาคือแอปพลิเคชันของฉันต้องเป็นพร็อกซีแบบโปร่งใสในส่วนที่เกี่ยวกับเซิร์ฟเวอร์อัปสตรีม หากฉันเพิ่มเพย์โหลดด้วยข้อมูลที่สัมพันธ์กัน เซิร์ฟเวอร์อัปสตรีมจะปฏิเสธมัน


person Chris Jansson    schedule 01.02.2019    source แหล่งที่มา


คำตอบ (1)


เป็นการยากที่จะเชื่อมโยงคำขอ/ตอบกลับโดยไม่มีข้อมูลที่มีข้อมูลความสัมพันธ์

TcpOutboundGateway สามารถทำได้เนื่องจากซ็อกเก็ตนั้นถูกใช้สำหรับความสัมพันธ์ คำขอเดียวเท่านั้นที่สามารถคงค้างในแต่ละซ็อกเก็ตในแต่ละครั้ง CachingClientConnectionFactory อนุญาตการทำงานพร้อมกันในเกตเวย์โดยการรักษาพูลของซ็อกเก็ต

เทคนิคหนึ่งอาจเป็นแฟกทอรีการเชื่อมต่อไคลเอนต์แบบกำหนดเองที่รักษาแผนที่แบบหนึ่งต่อหนึ่งระหว่างการเชื่อมต่อแฟคทอรีเซิร์ฟเวอร์ของคุณและการเชื่อมต่อขาออก จากนั้น เมื่อได้รับการตอบกลับ ให้ค้นหาการเชื่อมต่อโรงงานเซิร์ฟเวอร์ที่เกี่ยวข้องซึ่งจะส่งการตอบกลับ เพียงแค่ต้องการแผนที่สองสามอัน - รหัสการเชื่อมต่อเซิร์ฟเวอร์ไปยังการเชื่อมต่อไคลเอนต์ และรหัสการเชื่อมต่อไคลเอนต์ไปยังรหัสการเชื่อมต่อเซิร์ฟเวอร์

หากคุณคิดวิธีแก้ปัญหาได้ ให้พิจารณาสนับสนุนกลับคืนสู่กรอบการทำงาน

person Gary Russell    schedule 01.02.2019
comment
ขอบคุณสำหรับการตอบกลับของคุณ ฉันอาจตรวจสอบสิ่งนั้นหากเราไม่พบวิธีแก้ปัญหาอื่น ฉันสามารถบรรลุปริมาณงานสูงโดยใช้เกตเวย์และ TaskExecutors ได้หรือไม่ - person Chris Jansson; 05.02.2019
comment
เกตเวย์ที่มี CachingClientConnectionFactory ควรให้ประสิทธิภาพที่สมเหตุสมผล แต่เพื่อประสิทธิภาพที่ดีที่สุด โซลูชันอะซิงก์ที่สมบูรณ์อย่างที่ฉันอธิบายไว้จะดีกว่า - person Gary Russell; 05.02.2019
comment
Gary ความพยายามครั้งแรกของฉันในเรื่องนี้คือการวางฐานโรงงานไคลเอนต์แบบกำหนดเองของฉันออกจาก CachingClientConnectionFactory เนื่องจากฉันเห็นว่ามันแทนที่วิธี onMessage() ในคลาสภายใน จากนั้นกลยุทธ์ของฉันคือเปลี่ยนตรรกะในการเลือกการเชื่อมต่อที่จะใช้ CachingClientConnectionFactory เพียงแค่ส่งข้อความไปยังการเชื่อมต่อแคชถัดไปที่พร้อมใช้งาน โดยที่ฉันต้องการส่งต่อไปยังการเชื่อมต่อที่แมปอย่างเหมาะสมสำหรับ ID การเชื่อมต่อของข้อความ - person Chris Jansson; 13.03.2019
comment
นั่นดูเหมือนเป็นแนวทางที่ดีใช่ไหม? หรือสามารถทำได้ง่ายกว่านี้โดยเพียงแค่ติดตั้ง TcpListener ในโรงงานที่กำหนดเองของฉัน ฉันไม่แน่ใจว่าฉันต้องการทุกอย่างจาก CachingClientConnectionFactory หรือไม่ รวมถึงคลาสภายในด้วย - person Chris Jansson; 13.03.2019
comment
ฉันคิดว่าคุณแค่ต้องการคู่ง่ายๆ ของ ConcurrentHashMaps - inboundCID:outboundCID และ outboundCID:inboundCID - person Gary Russell; 13.03.2019
comment
ตกลง. ในกรณีนั้น คุณจะแนะนำวิธีดักจับข้อความเมื่อส่งผ่านแฟกทอรีการเชื่อมต่อของฉันอย่างไร เพียงใช้ TcpListener และแทนที่ onMessage()? - person Chris Jansson; 13.03.2019
comment
คุณเพียงแค่ต้องมีตัวเสริมส่วนหัวในแต่ละทิศทางเพื่อแทนที่รหัสการเชื่อมต่อด้วยรหัสที่เกี่ยวข้อง - ขาเข้า - ค้นหาขาออก หากไม่พบหรือปิด ให้เปิดรหัสใหม่และเพิ่มลงในแผนที่ ในทิศทางกลับ หากไม่พบ ถือเป็นภาวะผิดพลาด - person Gary Russell; 13.03.2019
comment
คุณยังสามารถจัดการแผนที่ด้วย ApplicationListener<TcpConnectionEvent> (หรือ @EventListener) ซึ่งตอบสนองต่อการเปิดและปิดกิจกรรม - person Gary Russell; 13.03.2019
comment
นั่นสมเหตุสมผลแล้ว คุณมีความคิดเห็นเกี่ยวกับวิธีการจัดระเบียบส่วนประกอบต่างๆ เหล่านั้นหรือไม่ ฉันกำลังคิดถึงคอนคอนแบบกำหนดเอง โรงงานจะรักษาทั้งแผนที่และแทนที่ getConnection() เพื่อคืนการเชื่อมต่อที่สัมพันธ์กัน มันจะทำงานได้หรือไม่ถ้าจะประกาศ ApplicationListener หรือ header-enrichers ในคลาสที่กำหนดเองนั้นด้วย แค่คิดว่าคลาสเดียวจะทำให้ฟีเจอร์/PR ใช้งานได้ง่าย - person Chris Jansson; 13.03.2019
comment
คุณไม่จำเป็นต้องแทนที่สิ่งใด ฉันอยากจะแนะนำคลาส/bean แบบสแตนด์อโลนเพียงคลาสเดียวที่รับฟังเหตุการณ์และรับการอ้างอิงถึงโรงงานขาออก เพื่อให้สามารถเชื่อมต่อได้เมื่อจำเป็นต้องเชื่อมต่อใหม่ เพิ่มสองวิธี (หนึ่งวิธีสำหรับแต่ละทิศทาง) ตัวเสริมส่วนหัวสามารถใช้นิพจน์ SpEL เพื่อเรียกวิธีใดวิธีหนึ่งเพื่อรับค่าใหม่สำหรับรหัสการเชื่อมต่อ (โดยตั้งค่าการแทนที่เป็นจริง) - person Gary Russell; 14.03.2019
comment
ฉันเห็น. ขอบคุณสำหรับความช่วยเหลือของคุณจนถึงตอนนี้ เราได้ตั้งค่าตัวเสริมส่วนหัวการตอบกลับด้วยนิพจน์ SpEL เพื่อส่งข้อความกลับไปยังไคลเอ็นต์เรียบร้อยแล้ว ตอนนี้เรากำลังเพิ่มตัวเพิ่มประสิทธิภาพส่วนหัวของคำขอ ซึ่งเราต้องสร้างการเชื่อมต่อใหม่สำหรับแฟกทอรีการเชื่อมต่อไคลเอ็นต์โดยอิงตามการเชื่อมต่อขาเข้าใหม่ไปยังแฟกทอรีเซิร์ฟเวอร์ของเรา เรารู้วิธีเพิ่มตัวเสริมส่วนหัว แต่เราจะสั่งให้ TcpNetClientConnectionFactory สร้างการเชื่อมต่อใหม่ได้อย่างไร เมธอด buildNewConnection ดูเหมือนสิ่งที่เราต้องการ แต่ได้รับการป้องกัน - person Chris Jansson; 28.03.2019
comment
ผู้ดูแลระบบที่นี่ไม่ชอบการแสดงความคิดเห็นยาวๆ เกี่ยวกับคำถาม/คำตอบ และมันไม่ง่ายเลยที่จะใส่ข้อเสนอแนะของฉันลงในความคิดเห็น ฉันขอแนะนำให้คุณปิดคำถามนี้และเริ่มคำถามใหม่ มันอาจจะสั้น; เพียงเพิ่มลิงค์ไปยังอันนี้ - person Gary Russell; 28.03.2019
comment
ใช่ ฉันพยายามเปิดการสนทนาในแชทแต่ไม่มีชื่อเสียงเพียงพอ อย่างไรก็ตาม นี่คือคำถามใหม่: stackoverflow.com /คำถาม/55404438/ - person Chris Jansson; 28.03.2019