สรุป
ฉันใช้การสนับสนุน TCP & UDP ของ Spring Integration กับพร็อกซี TCP สตรีมการรับส่งข้อมูลผ่านแอปพลิเคชันของฉันไปยังเซิร์ฟเวอร์อัปสตรีม จากนั้นพร็อกซีการตอบสนองของเซิร์ฟเวอร์นั้นกลับผ่านแอปพลิเคชันของฉันไปยังไคลเอนต์ แม้ว่านี่จะเป็นการสื่อสารแบบสองทาง แต่ฉันต้องการทรูพุตแบบอะซิงโครนัสปริมาณมาก ดังนั้นฉันจึงใช้เกตเวย์ไม่ได้ ฉันกำลังพยายามใช้ Collaborating Outbound และ Inbound Channel Adapters ตามที่อธิบายไว้ในส่วน 34.8.2 แทน
การตั้งค่าส่วนประกอบการรวม
คำขอ
A TcpReceivingChannelAdapter
รับคำขอผ่าน TcpNetServerConnectionFactory
บนพอร์ต 6060 โดยจะวางคำขอเหล่านี้ในคำขอ QueueChannel
คำขอจะถูกรับโดย TcpSendingMessageHandler
ซึ่งส่งคำขอผ่านการเชื่อมต่อไคลเอนต์ที่สร้างโดย TcpNetClientConnectionFactory
การเชื่อมต่อนี้ส่งคำขอออกจากแอปพลิเคชันของฉันและไปยังเซิร์ฟเวอร์อัปสตรีม
การตอบสนอง
A TcpReceivingChannelAdapter
ได้รับการตอบกลับจากเซิร์ฟเวอร์อัปสตรีมผ่านการเชื่อมต่อ TcpNetClientConnectionFactory
โดยจะวางการตอบกลับเหล่านี้ในการตอบกลับ QueueChannel
การตอบกลับจะถูกรับโดย TcpSendingMessageHandler
ซึ่งพยายามส่งการตอบกลับกลับไปยังไคลเอนต์ผ่านการเชื่อมต่อจาก TcpNetServerConnectionFactory
ดั้งเดิม การเชื่อมต่อขั้นสุดท้ายคือสิ่งที่ล้มเหลว
@Bean
public PollableChannel requestChannel() {
return new QueueChannel(1000);
}
@Bean
public PollableChannel replyChannel() {
return new QueueChannel(1000);
}
@Bean
public TcpNetServerConnectionFactory serverFactory() {
TcpNetServerConnectionFactory serverFactory = new TcpNetServerConnectionFactory(6060);
serverFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
serverFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
serverFactory.setSingleUse(false);
return serverFactory;
}
@Bean
public TcpNetClientConnectionFactory clientFactory() {
TcpNetClientConnectionFactory clientFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6080);
clientFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
clientFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
clientFactory.setSingleUse(false);
return clientFactory;
}
@Bean
public TcpReceivingChannelAdapter inboundRequestAdapter() {
TcpReceivingChannelAdapter inboundRequestAdapter = new TcpReceivingChannelAdapter();
inboundRequestAdapter.setConnectionFactory(serverFactory());
inboundRequestAdapter.setOutputChannel(requestChannel());
return inboundRequestAdapter;
}
@Bean
@ServiceActivator(inputChannel = "requestChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
public TcpSendingMessageHandler outboundRequestAdapter() {
TcpSendingMessageHandler outboundRequestAdapter = new TcpSendingMessageHandler();
outboundRequestAdapter.setConnectionFactory(clientFactory());
return outboundRequestAdapter;
}
@Bean
public TcpReceivingChannelAdapter inboundReplyAdapter() {
TcpReceivingChannelAdapter inboundReplyAdapter = new TcpReceivingChannelAdapter();
inboundReplyAdapter.setConnectionFactory(clientFactory());
inboundReplyAdapter.setOutputChannel(replyChannel());
return inboundReplyAdapter;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
public TcpSendingMessageHandler outboundReplyAdapter() {
TcpSendingMessageHandler outboundReplyAdapter = new TcpSendingMessageHandler();
outboundReplyAdapter.setConnectionFactory(serverFactory());
return outboundReplyAdapter;
}
ผลลัพธ์ที่แท้จริง
ข้อผิดพลาด:
Unable to find outbound socket for GenericMessage
การติดตามสแต็กเต็ม:
2019-02-01 14:10:55.315 ERROR 32553 --- [ask-scheduler-2] o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket for GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
2019-02-01 14:10:55.319 ERROR 32553 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, failedMessage=GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
นี่สมเหตุสมผลแล้ว ฉันทราบว่า TcpReceivingChannelAdapter
ตั้งค่าฟิลด์ส่วนหัวของข้อความ ip_connectionId
เมื่อส่งต่อข้อความ เนื่องจากฉันไม่มีความสัมพันธ์กันในขณะนี้ ส่วนหัว ID จากอะแดปเตอร์ขาเข้าตัวแรกจะหายไปเมื่อเพย์โหลดถูกพร็อกซีอัปสตรีม และอะแดปเตอร์ขาเข้าตัวที่สองจะสร้างส่วนหัว ID ใหม่
ด้วยเหตุนี้ เมื่อการตอบกลับกลับไปยังอะแดปเตอร์ขาออกสุดท้าย ส่วนหัวของ ID จะไม่ตรงกับสิ่งใดๆ ที่อะแดปเตอร์ขาเข้าที่เกี่ยวข้องทราบ ดังนั้นจึงไม่รู้ว่าจะใช้การเชื่อมต่อใดในการส่งคำตอบ
คำถามของฉันคือ: มีวิธีใดในการตั้งค่าการเชื่อมต่อ "เริ่มต้น" หรือเพิ่มเพย์โหลดด้วยข้อมูลที่สัมพันธ์กันโดยไม่ต้องส่งอัปสตรีมนั้น
ปัญหาคือแอปพลิเคชันของฉันต้องเป็นพร็อกซีแบบโปร่งใสในส่วนที่เกี่ยวกับเซิร์ฟเวอร์อัปสตรีม หากฉันเพิ่มเพย์โหลดด้วยข้อมูลที่สัมพันธ์กัน เซิร์ฟเวอร์อัปสตรีมจะปฏิเสธมัน