Ringkasan
Saya menggunakan Dukungan TCP & UDP Spring Integration untuk melakukan proxy TCP mengalirkan lalu lintas melalui aplikasi saya ke server upstream, dan kemudian memproksi respons server tersebut kembali melalui aplikasi saya ke klien. Meskipun ini adalah komunikasi dua arah, saya memerlukan throughput asinkron bervolume tinggi, jadi saya tidak bisa menggunakan Gateway. Sebagai gantinya, saya mencoba menggunakan Kolaborasi Adaptor Saluran Keluar dan Masuk seperti yang dijelaskan di bagian 34.8.2.
Pengaturan Komponen Integrasi
Permintaan
A TcpReceivingChannelAdapter
menerima permintaan melalui TcpNetServerConnectionFactory
pada port 6060. Ini menempatkan permintaan ini pada permintaan QueueChannel
. Permintaan diambil oleh TcpSendingMessageHandler
, yang mengirimkan permintaan melalui koneksi klien yang dihasilkan oleh TcpNetClientConnectionFactory
. Koneksi ini mengirimkan permintaan keluar dari aplikasi saya dan ke server upstream.
Respon
A TcpReceivingChannelAdapter
menerima respons dari server upstream melalui koneksi TcpNetClientConnectionFactory
. Ini menempatkan respons ini pada respons QueueChannel
. Respons diambil oleh TcpSendingMessageHandler
, yang mencoba mengirim respons kembali ke klien melalui koneksi dari TcpNetServerConnectionFactory
asli. Koneksi terakhir inilah yang gagal.
@Bean
public PollableChannel requestChannel() {
return new QueueChannel(1000);
}
@Bean
public PollableChannel replyChannel() {
return new QueueChannel(1000);
}
@Bean
public TcpNetServerConnectionFactory serverFactory() {
TcpNetServerConnectionFactory serverFactory = new TcpNetServerConnectionFactory(6060);
serverFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
serverFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
serverFactory.setSingleUse(false);
return serverFactory;
}
@Bean
public TcpNetClientConnectionFactory clientFactory() {
TcpNetClientConnectionFactory clientFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6080);
clientFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
clientFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
clientFactory.setSingleUse(false);
return clientFactory;
}
@Bean
public TcpReceivingChannelAdapter inboundRequestAdapter() {
TcpReceivingChannelAdapter inboundRequestAdapter = new TcpReceivingChannelAdapter();
inboundRequestAdapter.setConnectionFactory(serverFactory());
inboundRequestAdapter.setOutputChannel(requestChannel());
return inboundRequestAdapter;
}
@Bean
@ServiceActivator(inputChannel = "requestChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
public TcpSendingMessageHandler outboundRequestAdapter() {
TcpSendingMessageHandler outboundRequestAdapter = new TcpSendingMessageHandler();
outboundRequestAdapter.setConnectionFactory(clientFactory());
return outboundRequestAdapter;
}
@Bean
public TcpReceivingChannelAdapter inboundReplyAdapter() {
TcpReceivingChannelAdapter inboundReplyAdapter = new TcpReceivingChannelAdapter();
inboundReplyAdapter.setConnectionFactory(clientFactory());
inboundReplyAdapter.setOutputChannel(replyChannel());
return inboundReplyAdapter;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
public TcpSendingMessageHandler outboundReplyAdapter() {
TcpSendingMessageHandler outboundReplyAdapter = new TcpSendingMessageHandler();
outboundReplyAdapter.setConnectionFactory(serverFactory());
return outboundReplyAdapter;
}
Hasil Sebenarnya
Kesalahan:
Unable to find outbound socket for GenericMessage
Pelacakan tumpukan penuh:
2019-02-01 14:10:55.315 ERROR 32553 --- [ask-scheduler-2] o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket for GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
2019-02-01 14:10:55.319 ERROR 32553 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, failedMessage=GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Ini masuk akal. Saya mengetahui bahwa TcpReceivingChannelAdapter
menyetel bidang header pesan ip_connectionId
saat meneruskan pesan. Karena saya tidak memiliki logika korelasi apa pun saat ini, header ID dari Adaptor masuk pertama hilang saat payload diproksikan ke hulu, dan Adaptor masuk kedua menghasilkan header ID baru.
Akibatnya, ketika balasan kembali ke Adaptor keluar akhir, header ID tidak cocok dengan apa pun yang diketahui oleh Adaptor masuk terkait. Jadi, ia tidak tahu koneksi mana yang digunakan untuk mengirim respons.
Pertanyaan saya adalah ini: apakah ada cara untuk menyetel koneksi "default", atau menambah muatan dengan data yang berkorelasi tanpa mengirimkannya ke hulu?
Masalahnya adalah aplikasi saya harus berupa proxy transparan sehubungan dengan server upstream. Jika saya menambah payload dengan data yang berkorelasi, server upstream akan menolaknya.