Dalam artikel saya sebelumnya, Cara berpikir reaktif dan menganimasikan objek bergerak menggunakan RxJs, saya menjelaskan cara membuat kelas MobileObject yang menyimulasikan pergerakan suatu objek berdasarkan percepatan yang diberikan padanya oleh pengontrol eksternal .

Sekarang saya ingin menunjukkan kepada Anda sistem terdistribusi sederhana yang memungkinkan aplikasi Pengontrol mengontrol pergerakan MobileObject dari jarak jauh. Aplikasi jarak jauh kedua, Monitor, menunjukkan pergerakan suatu benda pada bidang dua dimensi. Di tengah-tengah sistem terdapat MobileObjectServer, yang merupakan tempat di mana MobileObjects berada.

Tujuan artikel ini adalah untuk menjelaskan bagaimana pemikiran Reaktif secara progresif dapat menghasilkan desain yang memetakan kebutuhan dengan sangat alami dan menghasilkan solusi yang tepat. Kami pada akhirnya akan menyelesaikan masalah dengan berlangganan SATU Observable saja.

Kami akan fokus pada bagian server, yang paling menarik dari sudut pandang ini.

Untuk implementasinya, kami akan menggunakan RxJs dan TypeScript. Server berjalan pada Node.js. Semua komponen berkomunikasi menggunakan Web-Sockets.

Basis kode lengkap, terdiri dari Pengontrol Server dan Monitor, dapat ditemukan di sini.

Skema sistem terdistribusi

Skema logis dari sistem terdistribusi direpresentasikan dalam diagram berikut:

Di tengahnya terdapat MobileObjectServer tempat instance MobileObjets dijalankan. Setiap MobileObject dikontrol oleh Controller-nya, yaitu aplikasi Web yang melaluinya kita dapat mengeluarkan perintah (seperti akselerasi, rem) ke MobileObject. Pergerakan semua MobileObjects dapat dilihat pada satu atau beberapa Monitor. Setiap Monitor juga merupakan aplikasi Web.

Diagram berikut menunjukkan contoh alur interaksi antara satu Pengontrol, satu Monitor, dan MobileObjectServer.

Persyaratan Server dalam hal acara

Kami dapat menyatakan persyaratan untuk bagian server dari sistem terdistribusi kami dalam bentuk kejadian:

  • Event1 — ketika Controller terhubung =› membuat MobileObject
  • Event2 — ketika Controller menerima perintah =› meneruskan perintah ke MobileObject yang dikontrol oleh Controller
  • Event3 — ketika Pengontrol terputus =› hapus MobileObjectyang dikontrol oleh Pengontrol
  • Event4 — ketika Monitor terhubung =› mulai mengirimkan data dinamika dari semua MobileObjects yang berjalan ke Monitor yang baru terhubung
  • Event5 — ketika MobileObject ditambahkan =› mulai mengirimkan data dinamikanya ke semua Monitoryang terhubung
  • Event6 — ketika Monitor terputus =› berhenti mengirimkan aliran data dinamika untuk semua MobileObjects ke Monitor tersebut

Berpikir reaktif akan menghasilkan desain yang secara alami memetakan persyaratan yang diungkapkan dengan cara ini.

Elemen-elemen yang menyusun server

Komponen server dari aplikasi terdistribusi terdiri dari dua elemen utama:

  • kelas MobileObject, yang mengimplementasikan logika pergerakan dinamis menggunakan RxJs Observables — ini telah dijelaskan secara rinci di sini
  • kelas MobileObjectServer,yang mengelola protokol soket web, menerima perintah dari Pengontrol dan mengirimkannya ke Monitor semua informasi tentang dinamika MobileObject. Penerapan ini terinspirasi oleh artikel ini dari Luis Aviles.

API MobileObject

Mari kita lihat gambaran singkat tentang kelas MobileObject — semua detailnya dapat ditemukan di sini sedangkan kodenya dapat ditemukan di repositori ini.

MobileObject menawarkan dua kelompok API.

Yang pertama adalah sekumpulan metode yang melaluinya Pengontrol eksternal dapat mengeluarkan perintah yang mempengaruhi dinamika objek (misalnya, akselerasi, rem).

Yang kedua adalah aliran data hanya-baca yang berkomunikasi dengan klien eksternal, Monitor, data relevan tentang perilaku dinamis objek (yaitu, posisi dan kecepatannya terhadap waktu).

Untuk memindahkan instance MobileObject, Controller harus mengaktifkannya (dengan metode turnOn()), menerapkan akselerasi yang diinginkan (dengan metode accelerateX(acc: number) dan accelerateY(acc: number)) , dan mungkin rem (dengan metode brake()).

Saat Monitor terhubung ke MobileObjectServer, MobileObjectServer berlangganan dynamicsObs dan observasi dari MobileObjects yang berjalan di server. Kemudian mulai mengirimkan data terkait pergerakan mereka ke Monitor yang terhubung.

Untuk keperluan artikel ini, inilah semua yang perlu Anda ketahui tentang MobileObject.

Soket sebagai Yang Dapat Diamati

MobileObjectServer mulai melakukan sesuatu ketika klien, baik Pengontrol atau Monitor, membuka koneksi soket web. Seiring berjalannya waktu, MobileObjectServerdapat menerima banyak permintaan untuk membuka koneksi dari banyak klien.

Ini terlihat seperti soket yang dapat diamati. Ini adalah cara mendapatkannya menggunakan perpustakaan socket.io:

Melalui fungsi sockets, kita membuat Observable dari SocketObs (kita akan melihat implementasi kelas ini nanti). Setiap kali server websocket menerima permintaan connect dan membuat socket baru, Observable yang dikembalikan oleh fungsi ini memancarkan sebuah instance SocketObs yang membungkus socket baru saja dibuat.

Pesan melalui soket sebagai Dapat Diamati

Socket dapat digunakan untuk mengirimkan pesan dari client ke server dan sebaliknya. Dengan perpustakaan socket.io, kita dapat mengirim pesan menggunakan metode emit.

SocketIO.Socket.emit(event: string, …args: any[]): SocketIO.Socket

Parameter event dapat dilihat sebagai pengenal jenis pesan yang ingin kita kirim. Parameter …args dapat digunakan untuk mengirim data khusus untuk satu pesan.

Siapa pun yang tertarik pada jenis pesan tertentu (atau peristiwa, menggunakan terminologi socket.io) dapat mulai mendengarkan pada soket menggunakan metode on.

SocketIO.Emitter.on(event: string, fn: Function): SocketIO.Emitter

Sekali lagi, urutan pesan yang diterima oleh Penerima terlihat seperti Dapat Diamati. Inilah cara kita membuat Observable yang benar-benar memancarkan setiap kali pesan jenis tertentu diterima.

Metode onMessageType adalah metode yang berhasil. Ia mengembalikan Observable, yang muncul setiap kali pesan bertipe messageType diterima.

Dengan cara ini, peristiwa soket, atau pesan sebagaimana kami menyebutnya di sini, telah diubah menjadi Dapat Diamati. Ini akan menjadi dasar desain kami.

Tentukan sifat Klien

Ada dua jenis klien yang dapat terhubung dengan MobileObjectServer. Satunya adalah Pengontrol dan satu lagi adalah Monitor. MobileObjectServer pertama-tama perlu menentukan jenis klien mana yang akan ditangani pada soket tertentu.

Cara yang kita pilih untuk mengimplementasikan logika tersebut adalah dengan membuat Pengontrol dan Monitor mengirimkan jenis pesan berbeda sebagai pesan pertamanya.

  • Pengontrol mengirimkan pesan jenis BIND_CONTROLLER
  • Monitor mengirimkan pesan jenis BIND_MONITOR

Tergantung pada jenis pesan pertama yang diterima pada soket, MobileObjectServer dapat mengidentifikasi apakah pesan tersebut berkomunikasi dengan Pengontrol atau Monitor .

Segera setelah soket dibuat, MobileObjectServer harus mulai mendengarkan kedua jenis pesan, BIND_CONTROLLER dan BIND_MONITOR. Yang pertama terjadi akan menang. Ini adalah race antara dua Observable yang memetakan dua jenis pesan berbeda.

Logika seperti itu harus diulang setiap kali soket baru dibuat, yaitu setiap kali Observable dikembalikan oleh fungsi sockets yang dipancarkan. Oleh karena itu, kita perlu menggabungkan semua event yang memenangkan perlombaan. Kita perlu menggunakan operator mergeMap, yang menggabungkan semua peristiwa yang dimunculkan oleh Observable yang terlibat, dan meratakan hasilnya menjadi Observable baru (mergeMap sebelumnya dikenal sebagai flatMap).

Kode untuk mendapatkan hasil ini adalah sebagai berikut:

Kini setelah kita mengetahui cara membedakan Pengontrol dan Monitor, kita dapat berfokus pada apa yang harus dilakukan dalam kedua kasus tersebut.

Peristiwa yang relevan untuk Monitor

Monitor menunjukkan pergerakan semua MobileObjects yang berjalan di MobileObjectServer. Jadi MobileObjectServerharus mengirimkan informasi yang benar ke monitor pada waktu yang tepat. Mari kita lihat dulu waktu-waktu tersebut, yaitu kejadian relevan apa yang harus diperhatikan oleh MobileObjectServer agar dapat memenuhi tugasnya.

Menambah dan menghapus MobileObjects

Peristiwa relevan pertama adalah:

  • MobileObject telah ditambahkan =› MobileObject ditampilkan di Monitor
  • MobileObject telah dihapus =› MobileObject dihapus dari Monitor

MobileObjects ditambahkan atau dihapus seiring berjalannya waktu, sehingga peristiwa tersebut dapat dimodelkan dengan dua Observable:

  • sebuah Observable yang muncul ketika MobileObject ditambahkan
  • sebuah Observable yang muncul ketika MobileObject dihapus

Setelah Monitor terhubung, MobileObjectServer mulai tertarik pada kedua Observable tersebut, sehingga harus merge keduanya:

Mirip dengan apa yang telah kita lihat sebelumnya, kita perlu mengulangi logika tersebut setiap kali Monitor ditambahkan. Oleh karena itu kita perlu mergeMap semua Observable yang merupakan hasil merge dari 'observable mobile object ditambahkan' dengan Observable 'mobile object dihapus'.

Ini adalah kode untuk mendapatkan Observable yang muncul kapan saja MobileObject harus ditambahkan atau dihapus dari setiap Monitor:

Kami telah memperkenalkan beberapa hal dengan kode ini yang patut dikomentari di sini.

Kami telah membuat kelas MobileObjectServer, yang akan menjadi tempat kami mengkodekan semua logika server kami mulai sekarang.

Metode handleMonitorsObs, yang akan kita perkaya nanti, hanya mengembalikan merge dari dua Observable, mobileObjectAdded dan mobileObjectRemoved, yang merupakan Subjek. Ini adalah “batin” merge yang ditunjukkan pada gambar di atas.

Subyek adalah Observable, dan oleh karena itu dapat digabungkan seperti yang kita lakukan di sini. Namun Subjek juga merupakan Pengamat, sehingga kita dapat memancarkan peristiwa melalui mereka. Seperti yang akan kita lihat nanti di kodenya, akan ada saatnya kita akan menggunakan Subjek ini untuk memancarkan peristiwa yang disarankan oleh namanya.

Poin terakhir terkait dengan kode yang telah kita tambahkan di metode startSocketServer:

race(
   socket.onMessageType(MessageType.BIND_MONITOR)
   .pipe(
      map(() => (sObs: SocketObs) => this.handleMonitorObs(sObs))
   ),
   socket.onMessageType(MessageType.BIND_CONTROLLER)
   // something will be added here soon to make this logic work
)
.pipe(
   mergeMap(handler => handler(socket))
)

Ini pada dasarnya adalah cara untuk mengatakan: setiap kali pesan BIND_MONITOR diterima, kembalikan fungsinya

(socketObs: SocketObs) => this.handleMonitorObs(socketObs)

yang akan dieksekusi dalam operator mergeMap yang disalurkan ke hasil fungsi race. Operator mergeMap ini adalah mergeMap eksternal yang ditunjukkan pada gambar di atas.

Cara lain untuk membaca kode adalah sebagai berikut: setiap peristiwa yang terkait dengan pesan bertipe BIND_MONITOR akan diubah oleh logika

mergeMap(() => this.handleMonitorObs(socket))

dimana socket adalah turunan dari tipe SocketsObs yang dipancarkan oleh fungsi race.

Kami akan segera menambahkan sesuatu yang serupa untuk kasus BIND_CONTROLLER agar seluruh logika ini berfungsi.

Menangani dinamika MobileObject yang Dapat Diamati

Mari kita pertimbangkan satu Monitor yang terhubung ke MobileObjectServer. Setelah koneksi, beberapa MobileObjects ditambahkan ke MobileObjectServer.

Sekarang untuk setiap MobileObject, kita harus mulai mempertimbangkan dinamika Observable yang mereka tawarkan sebagai bagian dari API mereka. Observable ini mengeluarkan, pada interval waktu yang teratur, data tentang dinamika (posisi dan kecepatan) MobileObject. Jika mobileObject menyimpan referensi ke MobileObject, kita dapat memperoleh dinamikanya yang Dapat Diamati melalui mobileObject.dynamicsObs (lihat API MobileObject).

Pertama kita harus mentransformasi setiap kejadian yang mewakili fakta bahwa MobileObject telah ditambahkan ke dalam rangkaian kejadian yang dipancarkan oleh dynamicsObs-nya. Kemudian kita mergeMap semua rangkaian ini menjadi satu Observable baru yang memancarkan semua peristiwa dinamis untuk semua MobileObjects yang ditambahkan.

Lalu kita menerapkan semua jazz ini ke semua Monitor yang terhubung ke MobileObjectServer. Jadi kita mendapatkan Observable baru yang memancarkan data dinamika untuk semua Monitor dan semua MobileObjects (ditambah semua peristiwa yang terkait dengan fakta bahwa MobileObject telah dihapus).

Setiap interval waktu, kami memiliki kelompok yang terdiri dari empat peristiwa terkait dengan emisi data tentang dinamika MobileObjects kami. Mengapa? Hal ini masuk akal jika kita berpikir bahwa kita memiliki dua Monitor dan dua MobileObjects. Setiap MobileObject harus mengirimkan data dinamikanya ke setiap Monitor setiap interval waktu. Oleh karena itu benar untuk melihat empat peristiwa setiap interval waktu.

Setelah ini jelas, kodenya sangat sederhana:

Kami baru saja memperkenalkan satu perubahan sederhana. Kami mengubah metode handleMonitorObs untuk menambahkan operator mergeMap. Ini mengubah mobileObjectAdded Observable sehingga Observable baru memancarkan data dinamika yang kita cari.

Sisanya masih belum tersentuh.

Ringkasan sejauh ini

Apa yang telah kita lakukan sejauh ini? Kami baru saja mentransformasikan Observable untuk memperoleh Observable baru yang memancarkan semua kejadian yang diinginkan oleh MobileObjectServer ketika harus berurusan dengan Monitor. Tidak ada lagi.

Anda dapat melihat bagaimana transformasi ini tercermin dalam kode pada gambar berikut:

Satu-satunya hal yang perlu kita lakukan sekarang adalah menambahkan efek samping yang diinginkan ke kejadian yang relevan. Hal ini pada akhirnya akan memungkinkan kita mencapai apa yang kita inginkan, yaitu menyampaikan informasi yang tepat kepada Monitor pada waktu yang tepat.

Namun sebelum beralih ke efek samping, mari kita bahas apa yang MobileObjectServer perlu lakukan saat berinteraksi dengan Pengontrol, klien lain dalam sistem terdistribusi kami.

Peristiwa yang relevan untuk Pengendali

Saat Pengontrol terhubung ke MobileObjectServer ada lebih sedikit hal yang perlu diperhatikan oleh server. Setidaknya ada lebih sedikit peristiwa relevan yang terjadi.

Hal-hal yang perlu diperhatikan oleh MobileObjectServer adalah:

  • Pengontrol telah terhubung, yang dalam logika sederhana kita berarti kita harus membuat MobileObject baru
  • Pengontrol telah mengirimkan perintah untuk MobileObject miliknya
  • Pengontrol telah terputus. Dalam implementasi kita, ini berarti kita harus menghapus MobileObject yang dikontrol oleh Controller (kita memiliki hubungan 1 banding 1 antara MobileObject dan Pengontrol)

Kita sudah mengetahui peristiwa pertama: peristiwa yang dipancarkan oleh Observable yang dikembalikan oleh socket.onMessageType(BIND_CONTROLLER).

Perintah dikirim oleh Pengontrol ke MobileObjectServer dalam bentuk pesan. Jadi kita dapat membuat perintah Observable yang diterima melalui socket tertentu (yang diterima dari Controller tertentu) karena setiap Controller memiliki socketnya sendiri. Kita melakukan ini hanya dengan menggunakan onMessageType metode SocketObs

socket.onMessageType(CONTROLLER_COMMAND)

SocketObs juga menawarkan metode, onDisconnect, yang mengembalikan Observable yang muncul ketika socket terputus. Inilah yang kita butuhkan untuk menghadapi event ketiga.

Karena kita berurusan dengan lebih dari satu Pengontrol yang berpotensi terhubung ke MobileObjectServer, Anda tidak akan terkejut jika mengetahui bahwa kita perlu mergeMap hasil dari merge. Ini adalah jenis transformasi yang sama yang telah kami lakukan beberapa kali.

Kode tersebut juga seharusnya tidak mengejutkan.

Kita baru saja menambahkan metode handleControllerObs yang berhubungan dengan perintah yang diterima dan pemutusan hubungan Pengontrol. Kami menerapkan transformasi mergeMap seperti yang telah kami lakukan dengan handleMonitorObs.

Ringkasan transformasi yang diterapkan pada Pengontrol

Diagram berikut mengilustrasikan semua transformasi yang telah kita terapkan mulai dari Observable yang muncul ketika Pengontrol terhubung.

Yang Terakhir Dapat Diamati

Jika kita menggabungkan transformasi yang telah kita lakukan untuk Monitor dan Pengendali, yang kita peroleh adalah Observable akhir berikut.

Hanya dengan berlangganan Observable terakhir ini, seluruh rangkaian peristiwa akan terungkap.

Efek samping

Pohon peristiwa indah yang kami buat dengan berlangganan Final Observable tidak menghasilkan apa-apa. Namun ia berfungsi dengan baik dalam memetakan Peristiwa yang kami identifikasi saat menjelaskan persyaratan Server di awal artikel ini.

Pada dasarnya ini memberi tahu kita dengan jelas kapan kita harus melakukan sesuatu.

Sesuatu inilah yang kami sebut sebagai efek samping.

Saat Pengontrol terhubung dan terputus, kita masing-masing membuat atau menghapus MobileObject. Sebagai efek samping dari tindakan ini adalah kami memunculkan peristiwa “MobileObject ditambahkan” dan “MobileObject dihapus” menggunakan Subjek mobileObjectAdded dan mobileObjectRemoved, kami memperkenalkan beberapa paragraf yang lalu.

Cara menerapkan efek samping

Di RxJs ada berbagai cara untuk menerapkan efek samping.

Pengamat adalah salah satunya. Kita dapat menambahkan Pengamat saat kita subscribe menggunakan operator tap (sebelumnya dikenal sebagai do).

Cara lain adalah dengan memasukkannya ke dalam fungsi apa pun yang kita teruskan ke operator RxJ mana pun.

Kami terutama akan menggunakan tap, karena ini memungkinkan kami menempatkan efek samping di seluruh pohon kejadian. Namun kami juga akan menempatkan efek samping langsung di dalam fungsi yang kami teruskan ke operator RxJs.

Satu-satunya tempat kami tidak memberikan efek samping adalah subscribe. Alasannya adalah, mengingat cara kami membangunnya, Pengamat Akhir mengeluarkan berbagai jenis peristiwa. Oleh karena itu subscribe, yang berfungsi sama untuk semua kejadian, bukanlah tempat yang tepat untuk menempatkan perilaku yang bergantung pada jenis kejadian tertentu.

Mudah-mudahan pada titik ini kode tersebut dapat berbicara sendiri.

Yang terakhir namun tidak kalah pentingnya: penyelesaian Observables

Ada satu hal yang masih perlu kita lakukan untuk menyelesaikan desain kita: menghentikan aliran kejadian, atau menyelesaikan Observable, ketika Pengontrol atau Monitor terputus.

Ketika Pengontrol terputus

Saat Pengontrol terputus, kami menghapus MobileObject yang dikontrolnya. Sebagai bagian dari penghapusan, penting untuk memastikan bahwa MobileObjectServer berhenti mengirimkan data dinamika yang terkait dengan MobileObject ini ke Monitor yang terhubung. Artinya kita harus menyelesaikan Observable berikut:

mobObjInfo.mobObj.dynamicsObs
.pipe(
  tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
)

Kita dapat dengan mudah mencapainya hanya dengan menggunakan operator takeUntil bersama dengan mobileObjectRemoved Observable yang sudah kita ketahui:

mobObjInfo.mobObj.dynamicsObs
.pipe(
  tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
  takeUntil(this.mobileObjectRemoved.pipe(
    filter(id => id === mobObjInfo.mobObjId)
  ))
)

takeUntil memastikan bahwa Observable selesai ketika Observable diteruskan sebagai parameter ke takeUntil yang dipancarkan.

mobileObjectRemoved terpancar setiap kali MobileObject dihapus. Namun yang kami inginkan adalah berhenti mengirimkan informasi dinamika ketika MobileObject tertentu, yang diidentifikasi berdasarkan idnya, dihapus. Jadi kita menambahkan logika filter.

Saat Monitor terputus

Dalam hal ini, kita juga dapat menggunakan takeUntil.

Kita tahu kapan Monitor terputus karena socket, bertipe SocketObs, yang terkait dengannya dipancarkan melalui socket.onDisconnect() Observable. Jadi yang perlu kita lakukan adalah berhenti mengirimkan info dinamika ketika socket.onDisconnect() memancarkan.

Jadi logika terakhir yang mengatur penyelesaian Observable adalah

mobObjInfo.mobObj.dynamicsObs
.pipe(
  tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
  takeUntil(this.stopSendDynamics(socket, mobObjInfo.mobObjId))
)

Di mana

private stopSendDynamics(socket: SocketObs, mobObjId: string){
  return merge(
            this.mobileObjectRemoved.pipe(
                                       filter(id => id === mobObjId)
                                     ),
            socket.onDisconnect()
  );
}

Dan seperti inilah inti kode yang mengimplementasikan logika kita:

Kesimpulan

Ini merupakan perjalanan yang cukup panjang. Kita telah melihat beberapa alasan yang didorong oleh Berpikir Reaktif dan beberapa penerapan dari alasan ini.

Kami mulai mengubah acara WebSockets menjadi Observables. Kemudian, dengan menerapkan transformasi bertahap, kami akhirnya membuat satu Observable yang, setelah berlangganan, akan mengungkap semua peristiwa yang kami minati.

Pada titik ini, menambahkan efek samping yang memungkinkan kita mencapai tujuan sangatlah mudah.

Proses mental perancangan ini, yang bersifat inkremental, adalah makna yang saya berikan pada “Berpikir Reaktif”.

Basis kode lengkap, yang terdiri dari Pengontrol Server dan Monitor, dapat ditemukan di sini.