Cara mengirim permintaan/tanggapan asinkron melalui Adaptor Saluran yang berkolaborasi tanpa mengkorelasikan data

Ringkasan

Saya menggunakan Dukungan TCP & UDP Spring Integration untuk melakukan proxy TCP mengalirkan lalu lintas melalui aplikasi saya ke server upstream, dan kemudian memproksi respons server tersebut kembali melalui aplikasi saya ke klien. Meskipun ini adalah komunikasi dua arah, saya memerlukan throughput asinkron bervolume tinggi, jadi saya tidak bisa menggunakan Gateway. Sebagai gantinya, saya mencoba menggunakan Kolaborasi Adaptor Saluran Keluar dan Masuk seperti yang dijelaskan di bagian 34.8.2.

Pengaturan Komponen Integrasi

Permintaan

A TcpReceivingChannelAdapter menerima permintaan melalui TcpNetServerConnectionFactory pada port 6060. Ini menempatkan permintaan ini pada permintaan QueueChannel. Permintaan diambil oleh TcpSendingMessageHandler, yang mengirimkan permintaan melalui koneksi klien yang dihasilkan oleh TcpNetClientConnectionFactory. Koneksi ini mengirimkan permintaan keluar dari aplikasi saya dan ke server upstream.

Respon

A TcpReceivingChannelAdapter menerima respons dari server upstream melalui koneksi TcpNetClientConnectionFactory. Ini menempatkan respons ini pada respons QueueChannel. Respons diambil oleh TcpSendingMessageHandler, yang mencoba mengirim respons kembali ke klien melalui koneksi dari TcpNetServerConnectionFactory asli. Koneksi terakhir inilah yang gagal.

    @Bean
    public PollableChannel requestChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public PollableChannel replyChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public TcpNetServerConnectionFactory serverFactory() {
        TcpNetServerConnectionFactory serverFactory = new TcpNetServerConnectionFactory(6060);
        serverFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setSingleUse(false);
        return serverFactory;
    }

    @Bean
    public TcpNetClientConnectionFactory clientFactory() {
        TcpNetClientConnectionFactory clientFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6080);
        clientFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setSingleUse(false);
        return clientFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundRequestAdapter() {
        TcpReceivingChannelAdapter inboundRequestAdapter = new TcpReceivingChannelAdapter();
        inboundRequestAdapter.setConnectionFactory(serverFactory());
        inboundRequestAdapter.setOutputChannel(requestChannel());
        return inboundRequestAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "requestChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundRequestAdapter() {
        TcpSendingMessageHandler outboundRequestAdapter = new TcpSendingMessageHandler();
        outboundRequestAdapter.setConnectionFactory(clientFactory());
        return outboundRequestAdapter;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundReplyAdapter() {
        TcpReceivingChannelAdapter inboundReplyAdapter = new TcpReceivingChannelAdapter();
        inboundReplyAdapter.setConnectionFactory(clientFactory());
        inboundReplyAdapter.setOutputChannel(replyChannel());
        return inboundReplyAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundReplyAdapter() {
        TcpSendingMessageHandler outboundReplyAdapter = new TcpSendingMessageHandler();
        outboundReplyAdapter.setConnectionFactory(serverFactory());
        return outboundReplyAdapter;
    }

Hasil Sebenarnya

Kesalahan:

Unable to find outbound socket for GenericMessage

Pelacakan tumpukan penuh:

2019-02-01 14:10:55.315 ERROR 32553 --- [ask-scheduler-2] o.s.i.ip.tcp.TcpSendingMessageHandler    : Unable to find outbound socket for GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
2019-02-01 14:10:55.319 ERROR 32553 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, failedMessage=GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Ini masuk akal. Saya mengetahui bahwa TcpReceivingChannelAdapter menyetel bidang header pesan ip_connectionId saat meneruskan pesan. Karena saya tidak memiliki logika korelasi apa pun saat ini, header ID dari Adaptor masuk pertama hilang saat payload diproksikan ke hulu, dan Adaptor masuk kedua menghasilkan header ID baru.

Akibatnya, ketika balasan kembali ke Adaptor keluar akhir, header ID tidak cocok dengan apa pun yang diketahui oleh Adaptor masuk terkait. Jadi, ia tidak tahu koneksi mana yang digunakan untuk mengirim respons.

Pertanyaan saya adalah ini: apakah ada cara untuk menyetel koneksi "default", atau menambah muatan dengan data yang berkorelasi tanpa mengirimkannya ke hulu?

Masalahnya adalah aplikasi saya harus berupa proxy transparan sehubungan dengan server upstream. Jika saya menambah payload dengan data yang berkorelasi, server upstream akan menolaknya.


person Chris Jansson    schedule 01.02.2019    source sumber


Jawaban (1)


Sulit untuk mengkorelasikan permintaan/balasan tanpa data yang berisi informasi korelasi.

TcpOutboundGateway dapat melakukannya karena soket itu sendiri digunakan untuk korelasi; hanya satu permintaan yang dapat diajukan pada setiap soket dalam satu waktu. CachingClientConnectionFactory memungkinkan konkurensi di gateway dengan memelihara kumpulan soket.

Salah satu tekniknya mungkin adalah pabrik koneksi klien khusus yang memelihara peta satu-ke-satu antara koneksi pabrik server Anda dan koneksi keluar. Kemudian, ketika balasan diterima, cari koneksi pabrik server yang sesuai untuk mengirim balasan. Itu hanya memerlukan beberapa peta - id koneksi server ke koneksi klien, dan id koneksi klien ke id koneksi server.

Jika Anda menemukan solusi, pertimbangkan untuk mengembalikannya ke kerangka kerja.

person Gary Russell    schedule 01.02.2019
comment
Terimakasih atas tanggapan Anda. Saya mungkin akan memeriksanya, jika kita tidak dapat menemukan solusi lain. Bisakah saya mencapai throughput tinggi menggunakan Gateway dan TaskExecutors? - person Chris Jansson; 05.02.2019
comment
Gateway dengan CachingClientConnectionFactory seharusnya memberikan kinerja yang wajar, tetapi untuk kinerja terbaik, solusi yang sepenuhnya asinkron seperti yang saya jelaskan akan lebih baik. - person Gary Russell; 05.02.2019
comment
Gary, upaya pertama saya dalam hal ini adalah mendasarkan pabrik klien khusus saya pada CachingClientConnectionFactory, karena saya melihat bahwa metode tersebut mengesampingkan metode onMessage() di kelas dalamnya. Kemudian, strategi saya adalah mengubah logika pemilihan koneksi yang akan digunakan. CachingClientConnectionFactory cukup serahkan pesan ke koneksi cache berikutnya yang tersedia, dan sebagai gantinya saya ingin menyerahkannya ke koneksi yang dipetakan dengan tepat untuk ID koneksi pesan. - person Chris Jansson; 13.03.2019
comment
Apakah itu pendekatan yang bagus? Atau bisakah ini dilakukan lebih sederhana dengan hanya menerapkan TcpListener di pabrik khusus saya? Saya tidak yakin apakah saya memerlukan yang lainnya dari CachingClientConnectionFactory, termasuk kelas dalamnya. - person Chris Jansson; 13.03.2019
comment
Saya pikir Anda hanya perlu sepasang ConcurrentHashMaps - inboundCID:outboundCID dan outboundCID:inboundCID. - person Gary Russell; 13.03.2019
comment
Oke. Dalam hal ini, apa rekomendasi Anda tentang cara mencegat pesan saat melewati pabrik koneksi saya? Cukup terapkan TcpListener dan ganti onMessage()? - person Chris Jansson; 13.03.2019
comment
Anda hanya memerlukan pengaya header di setiap arah untuk mengganti id koneksi dengan yang terkait - masuk - cari yang keluar, jika tidak ditemukan, atau ditutup, buka yang baru dan tambahkan ke peta; pada arah baliknya, jika tidak ditemukan maka kondisi error. - person Gary Russell; 13.03.2019
comment
Anda juga dapat mengelola peta dengan ApplicationListener<TcpConnectionEvent> (atau @EventListener), bereaksi terhadap peristiwa buka dan tutup. - person Gary Russell; 13.03.2019
comment
Itu masuk akal. Apakah Anda mempunyai pendapat tentang cara mengatur berbagai komponen tersebut? Saya sedang memikirkan koneksi khusus. pabrik akan memelihara peta dan mengganti getConnection() untuk mengembalikan koneksi yang berkorelasi. Apakah akan berhasil juga mendeklarasikan ApplicationListener atau pengaya header di kelas khusus itu juga? Hanya berpikir satu kelas akan membuat fitur/PR sesederhana mungkin untuk digunakan. - person Chris Jansson; 13.03.2019
comment
Anda tidak perlu mengesampingkan apa pun. Saya akan menyarankan satu kelas/kacang yang berdiri sendiri yang mendengarkan peristiwa dan mendapatkan referensi ke pabrik keluar. sehingga bisa mendapatkan koneksi ketika dibutuhkan yang baru. Tambahkan dua metode (satu untuk setiap arah). Pengaya header kemudian dapat menggunakan ekspresi SpEL untuk memanggil satu atau metode lain guna mendapatkan nilai baru untuk id koneksi (dengan override disetel ke true). - person Gary Russell; 14.03.2019
comment
Jadi begitu. Terima kasih atas bantuan Anda sejauh ini. Kami telah berhasil menyiapkan pengaya header respons dengan ekspresi SpEL untuk mengembalikan pesan ke klien. Sekarang kami menambahkan pengaya header permintaan, di mana kami perlu membuat koneksi baru untuk pabrik koneksi klien berdasarkan koneksi masuk baru ke pabrik server kami. Kita tahu cara menambahkan pengaya header, tapi bagaimana kita menginstruksikan TcpNetClientConnectionFactory untuk membuat koneksi baru? Metode buildNewConnection sepertinya yang kita perlukan, tetapi dilindungi. - person Chris Jansson; 28.03.2019
comment
Admin disini tidak menyukai komentar panjang pada pertanyaan/jawaban dan tidak mudah untuk memasukkan saran saya ke dalam komentar. Saya sarankan Anda menutup pertanyaan ini, dan memulai pertanyaan baru. Ini bisa singkat; cukup tambahkan tautan ke yang ini. - person Gary Russell; 28.03.2019
comment
Benar, saya mencoba membuka percakapan di obrolan tetapi tidak memiliki reputasi yang cukup. Bagaimanapun, inilah pertanyaan baru: stackoverflow.com /pertanyaan/55404438/ - person Chris Jansson; 28.03.2019