В моей предыдущей статье Как реагировать и анимировать движущиеся объекты с помощью RxJ я описал, как создать класс MobileObject, который имитирует движение объекта, подверженного ускорениям, наложенным на него внешним контроллером. .

Теперь я хочу показать вам простую распределенную систему, которая позволяет приложению Контроллер удаленно управлять перемещением MobileObject. Второе удаленное приложение, Монитор , показывает движение объекта на двухмерном плане. В центре системы находится MobileObjectServer, где находятся MobileObject.

Цель этой статьи - объяснить, как реактивное мышление может постепенно создавать дизайн, который очень естественно отображает требования и дает аккуратное решение. Мы решим проблему, подписавшись только на ОДИН Observable.

Мы остановимся на серверной части, которая с этой точки зрения является наиболее интригующей.

Для реализации мы будем использовать RxJs и TypeScript. Сервер работает на Node. Все компоненты обмениваются данными с помощью веб-сокетов.

Полную кодовую базу, состоящую из Контроллера сервера и Монитора, можно найти здесь.

Схема распределенной системы

Логическая схема распределенной системы представлена ​​на следующей схеме:

В центре находится MobileObjectServer, на котором запускаются экземпляры MobileObjets. Каждый MobileObject управляется своим Контроллером, то есть веб-приложением, с помощью которого мы можем отдавать команды (например, ускорять, тормозить) MobileObject. Движение всех MobileObject можно увидеть на одном или нескольких мониторах. Каждый монитор снова является веб-приложением.

На следующей схеме показан пример потока взаимодействия между одним контроллером, одним монитором и MobileObjectServer.

Требования к серверу с точки зрения событий

Мы можем выразить требования к серверной части нашей распределенной системы в терминах событий:

  • Event1 - когда подключается контроллер = ›создание MobileObject
  • Событие2 - когда Контроллер получает команду = ›перенаправить команду на MobileObject, управляемый Контроллером.
  • Событие3 - при отключении Контроллера = ›удаление MobileObject, управляемого Контроллером.
  • Event4 - при подключении монитора = ›начало отправки данных динамики всех запущенных мобильных объектов на вновь подключенный монитор
  • Event5 - при добавлении MobileObject = ›начало отправки данных о его динамике на все подключенные мониторы.
  • Событие6 - когда Монитор отключается = ›прекращает отправку потоков динамических данных для всех MobileObject на этот Монитор.

Реактивное мышление создаст дизайн, который естественным образом отображает требования, выраженные таким образом.

Элементы, составляющие сервер

Серверный компонент распределенного приложения состоит из двух основных элементов:

  • класс MobileObject, реализующий логику динамического движения с использованием RxJs Observables - это подробно описано здесь
  • MobileObjectServer класс , который управляет протоколом веб-сокетов, получая команды от Контроллера и отправляя их на Мониторы вся информация о динамике MobileObject. Эта реализация вдохновлена ​​этой статьей Луиса Авилеса.

MobileObject API

Давайте сделаем краткий обзор класса MobileObject - все подробности можно найти здесь, а код - в этом репозитории.

MobileObject предлагает два семейства API.

Первый - это набор методов, с помощью которых внешний Контроллер может выдавать команды, влияющие на динамику объекта (например, ускорение, торможение).

Второй - это потоки данных только для чтения, которые передаются внешним клиентам, мониторам, соответствующим данным о динамическом поведении объекта (то есть его положении и скорости во времени).

Чтобы переместить экземпляр MobileObject, Контроллер должен включить его (с помощью метода turnOn()), применить желаемое ускорение (с помощью методов accelerateX(acc: number) и accelerateY(acc: number)). , а потом может тормозить (методом brake()).

Когда монитор подключается к MobileObjectServer, MobileObjectServer подписывается на dynamicsObs и наблюдаемые объекты MobileObject, запущенные в сервер. Затем он начинает отправку данных, связанных с их перемещением, на подключенные мониторы.

Для целей этой статьи это все, что вам нужно знать о MobileObject.

Сокеты как наблюдаемые

MobileObjectServer начинает что-то делать, когда клиент, либо Контроллер, либо Монитор, открывает соединение с веб-сокетом. С течением времени MobileObjectServer может получать множество запросов на открытие соединения от многих клиентов.

Это похоже на Observable сокетов. Вот как его получить с помощью библиотеки socket.io:

С помощью функции sockets мы создаем Observable SocketObs (реализацию этого класса мы увидим позже). Каждый раз, когда сервер websocket получает запрос connect и создает новый сокет, Observable, возвращаемый этой функцией, генерирует экземпляр SocketObs, который обертывает socket Только что создал.

Сообщения через сокеты как наблюдаемые

Сокеты могут использоваться для отправки сообщений от клиента на сервер и наоборот. С помощью библиотеки socket.io мы можем отправлять сообщения с помощью метода emit.

SocketIO.Socket.emit(event: string, …args: any[]): SocketIO.Socket

Параметр event можно рассматривать как идентификатор типа сообщения, которое мы хотим отправить. Параметры …args могут использоваться для отправки данных, относящихся к одному сообщению.

Тот, кого интересует определенный тип сообщения (или события, если использовать socket.io терминологию), может начать прослушивание сокета с помощью метода on.

SocketIO.Emitter.on(event: string, fn: Function): SocketIO.Emitter

Опять же, последовательности сообщений, полученных Receiver, выглядят как Observables. Вот как мы можем создавать Observables, которые фактически излучают каждый раз, когда получено сообщение определенного типа.

onMessageType - это тот метод, который помогает. Он возвращает Observable, который выдает каждый раз при получении сообщения типа messageType.

Таким образом, события сокетов или сообщения, как мы их здесь называем, были преобразованы в Observables. Они станут основой нашего дизайна.

Определить характер Клиента

Есть два типа клиентов, которые могут подключаться к MobileObjectServer. Один из них - Контроллер, а другой - Монитор. MobileObjectServer сначала должен определить, с каким типом клиента он будет работать в конкретном сокете.

Мы выбрали способ реализации такой логики: Контроллер и Монитор отправляют сообщения разных типов в качестве первого сообщения.

  • Контроллер отправляет сообщение типа BIND_CONTROLLER.
  • Монитор отправляет сообщение типа BIND_MONITOR.

В зависимости от типа первого сообщения, полученного в сокет, MobileObjectServer может определить, взаимодействует ли он с контроллером или монитором. .

Как только сокет создан, MobileObjectServer должен начать прослушивание обоих типов сообщений, BIND_CONTROLLER и BIND_MONITOR. Победит тот, кто первым придет. Это 20_ между двумя наблюдаемыми объектами, которые отображают два разных типа сообщений.

Такая логика должна повторяться каждый раз, когда создается новый сокет, то есть каждый раз, когда Observable, возвращаемый функцией sockets, излучает. Следовательно, нам нужно объединить все события, которые побеждают в гонке. Нам нужно использовать оператор mergeMap, который объединяет все события, вызванные задействованными Observable, и сглаживает результаты в новый Observable (mergeMap ранее назывался flatMap).

Код для получения этого результата следующий:

Теперь, когда мы знаем, как различать Контроллеры и Мониторы, мы можем сосредоточиться на том, что делать в этих двух случаях.

События, актуальные для монитора

Монитор показывает движение всех MobileObject, запущенных на MobileObjectServer. Таким образом, MobileObjectServer должен отправлять на мониторы нужную информацию в нужное время. Давайте сначала посмотрим, что это за время, то есть какие события должны быть известны MobileObjectServer, чтобы выполнять свою работу.

Добавление и удаление MobileObjects

Первые соответствующие события:

  • добавлен MobileObject = ›MobileObject отображается на мониторе
  • MobileObject удален = ›MobileObject удален из Monitor

MobileObjects добавляются или удаляются с течением времени, поэтому такие события можно моделировать с помощью двух Observable:

  • Observable, который излучает при добавлении MobileObject
  • Observable, который излучает при удалении MobileObject

После подключения монитора MobileObjectServer начинает интересоваться обоими этими наблюдаемыми объектами, поэтому он должен merge их:

Подобно тому, что мы видели раньше, нам нужно повторять такую ​​логику каждый раз, когда добавляется Монитор. Следовательно, нам нужно mergeMap все наблюдаемые объекты, которые являются результатом merge «мобильный объект добавлен» Observable с «мобильным объектом удален» Observable.

Это код для получения Observable, который генерирует каждый раз, когда MobileObject должен быть добавлен или удален из каждого Monitor:

Мы ввели в этот код несколько вещей, которые стоит здесь прокомментировать.

Мы создали класс MobileObjectServer, который с этого момента будет местом, где мы будем кодировать всю логику нашего сервера.

Метод handleMonitorsObs, который мы собираемся дополнить позже, возвращает просто merge двух Observable, mobileObjectAdded и mobileObjectRemoved, которые являются Subject. Это «внутренний» merge, показанный на картинке выше.

Субъекты являются Observable и поэтому могут быть объединены, как мы это делаем здесь. Но субъекты также являются наблюдателями, поэтому мы можем излучать события через них. Как мы увидим позже в коде, будет время, когда мы будем использовать эти Субъекты для генерации событий, предлагаемых их названиями.

Последний пункт связан с кодом, который мы добавили в метод startSocketServer:

race(
   socket.onMessageType(MessageType.BIND_MONITOR)
   .pipe(
      map(() => (sObs: SocketObs) => this.handleMonitorObs(sObs))
   ),
   socket.onMessageType(MessageType.BIND_CONTROLLER)
   // something will be added here soon to make this logic work
)
.pipe(
   mergeMap(handler => handler(socket))
)

Это в основном способ сказать: каждый раз, когда получено сообщение BIND_MONITOR, вернуть функцию

(socketObs: SocketObs) => this.handleMonitorObs(socketObs)

который будет выполнен в операторе mergeMap, переданном в результат функции race. Этот mergeMap оператор является внешним mergeMap, показанным на рисунке выше.

Другой способ прочитать код: любое событие, соответствующее сообщению типа BIND_MONITOR, преобразуется логикой

mergeMap(() => this.handleMonitorObs(socket))

где socket - это экземпляр типа SocketsObs, выданный функцией race.

Вскоре мы добавим нечто подобное для случая BIND_CONTROLLER, чтобы вся эта логика работала.

Обработка наблюдаемых динамических характеристик MobileObject

Рассмотрим один Монитор, который подключается к MobileObjectServer. После подключения к MobileObjectServer добавляется пара MobileObject.

Теперь для каждого MobileObject, мы должны начать рассматривать динамику Observables, которую они предлагают как часть своих API. Эти наблюдаемые через регулярные промежутки времени передают данные о динамике (положении и скорости) MobileObject. Если mobileObject хранит ссылку на MobileObject, мы можем получить его динамику Observable через mobileObject.dynamicsObs (см. API MobileObject).

Сначала мы должны преобразовать каждое событие, представляющее тот факт, что MobileObject был добавлен в серию событий, генерируемых его dynamicsObs. Затем мы mergeMap все эти серии в новый единственный Observable, который генерирует все динамические события для всех добавленных MobileObject.

Затем мы применяем весь этот джаз ко всем мониторам, которые подключаются к MobileObjectServer. Таким образом, мы получаем новый Observable, который генерирует динамические данные для всех мониторов и все MobileObject (плюс все события, связанные с тем, что MobileObject был удален).

Для каждого временного интервала у нас есть группы из четырех событий, связанных с передачей данных о динамике наших MobileObjects. Почему? Это имеет смысл, если мы думаем, что у нас есть два Монитора и два MobileObject. Каждый MobileObject должен отправлять свои динамические данные на каждый монитор за каждый временной интервал. Поэтому правильно видеть четыре события за каждый временной интервал.

Как только это станет ясно, код станет очень простым:

Мы только что внесли одно простое изменение. Мы изменили метод handleMonitorObs, добавив оператор mergeMap. Это преобразует mobileObjectAdded Observable так, что новый Observable испускает динамические данные, которые мы ищем.

Остальное осталось нетронутым.

Резюме на данный момент

Что мы уже сделали? Мы только что преобразовали Observables, чтобы получить новые Observables, которые генерируют все события, которые интересует MobileObjectServer, когда ему приходится иметь дело с Monitor. Ничего больше.

Вы можете увидеть, как эти преобразования отражаются в коде на следующем изображении:

Единственное, что нам нужно сделать сейчас, - это добавить желаемые побочные эффекты к соответствующим событиям. В конечном итоге это позволит нам достичь того, чего мы хотим, то есть передавать Монитору нужную информацию в нужное время.

Но прежде чем перейти к побочным эффектам, давайте рассмотрим, что MobileObjectServer должен делать при взаимодействии с Контроллером, другим клиентом в нашей распределенной системе.

События, относящиеся к Контроллеру

Когда Контроллер подключается к MobileObjectServer, сервер должен заботиться о меньшем количестве вещей. По крайней мере, происходит меньше вложенных релевантных событий.

MobileObjectServer должен заботиться о следующих вещах:

  • Подключен Контроллер, что в нашей простой логике означает, что мы должны создать новый MobileObject.
  • Контроллер отправил команды для своего MobileObject.
  • Контроллер отключен. В нашей реализации это означает, что мы каким-то образом должны удалить MobileObject, управляемый Контроллером (у нас есть отношение 1 к 1 между MobileObject и его Контроллер)

Мы уже знаем первое событие: оно генерируется Observable, возвращаемым socket.onMessageType(BIND_CONTROLLER).

Команды отправляются Контроллером на MobileObjectServer в виде сообщений. Таким образом, мы можем создать Observable команд, полученных через определенный сокет (, полученный от определенного контроллера), поскольку каждый контроллер имеет свой собственный сокет. Мы делаем это, просто используя onMessageType метод SocketObs

socket.onMessageType(CONTROLLER_COMMAND)

SocketObs также предлагает метод onDisconnect, который возвращает Observable, генерируемый при отключении сокета. Это то, что нам нужно, чтобы разобраться с третьим событием.

Поскольку мы имеем дело с более чем одним контроллером, потенциально подключающимся к MobileObjectServer, вас не должно удивлять то, что нам нужно mergeMap получить результат merge. Это тот же тип трансформации, который мы уже проделывали несколько раз.

Код тоже не должен вызывать удивления.

Мы просто добавили handleControllerObs метод, который работает с полученными командами и отключением контроллера. Мы применяем к нему преобразование mergeMap, как мы уже сделали с handleMonitorObs.

Сводка преобразований, примененных к контроллерам

На следующей диаграмме показаны все преобразования, которые мы применили, начиная с Observable, генерируемого при подключении Контроллера.

Последняя наблюдаемая

Если мы объединим преобразования, которые мы сделали для Мониторов и Контроллеров, мы получим следующую конечную наблюдаемую.

Просто подписавшись на этот последний Observable, можно развернуть все дерево событий.

Побочные эффекты

Красивое дерево событий, которое мы создали, подписавшись на Final Observable, ничего не делает. Но он хорошо справляется с отображением событий, которые мы определили при описании требований к серверу в начале этой статьи.

По сути, он ясно говорит нам, когда мы должны что-то сделать.

Это нечто и есть то, что мы называем побочным эффектом.

Когда Контроллер подключается и отключается, мы соответственно создаем или удаляем MobileObject. побочным эффектом этих действий является то, что мы вызываем события «MobileObject added» и «MobileObject deleted» с использованием тем mobileObjectAdded и mobileObjectRemoved, мы вводим некоторые параграфов назад.

Как реализовать побочные эффекты

В RxJ есть разные способы реализации побочных эффектов.

Наблюдатели один. Мы можем добавлять наблюдателей, subscribe используя оператор tap (ранее известный как do).

Другой способ - вставить их в любую функцию, которую мы передаем любому оператору RxJs.

В основном мы собираемся использовать tap, поскольку он позволяет нам размещать побочные эффекты по всему дереву событий. Но мы также собираемся разместить побочные эффекты непосредственно внутри функций, которые мы передаем операторам RxJs.

Единственное место, где мы не помещаем побочные эффекты, - это subscribe. Причина в том, что, учитывая то, как мы его построили, Final Observer генерирует множество различных типов событий. Следовательно, subscribe, который работает одинаково для всех событий, не является подходящим местом для определения поведения, которое зависит от определенных типов событий.

Надеюсь, на этом этапе код говорит сам за себя.

И последнее, но не менее важное: завершение Observables.

Есть одна вещь, которую нам еще нужно сделать, чтобы завершить наш дизайн: остановить потоки событий или завершить Observables, когда отключается Контроллер или Монитор.

Когда контроллер отключается

Когда Контроллер отключается, мы удаляем управляемый им MobileObject. В рамках удаления важно убедиться, что MobileObjectServer перестает отправлять динамические данные, связанные с этим MobileObject, на подключенные мониторы. Это означает, что мы должны заполнить следующую Observable:

mobObjInfo.mobObj.dynamicsObs
.pipe(
  tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
)

Мы можем легко добиться этого, просто используя оператор takeUntil вместе с уже знакомым нам mobileObjectRemoved Observable:

mobObjInfo.mobObj.dynamicsObs
.pipe(
  tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
  takeUntil(this.mobileObjectRemoved.pipe(
    filter(id => id === mobObjInfo.mobObjId)
  ))
)

takeUntil гарантирует, что Observable завершится, когда Observable, переданный в качестве параметра takeUntil, испускает.

mobileObjectRemoved излучает каждый раз, когда удаляется MobileObject. Однако мы хотим прекратить отправку информации о динамике при удалении определенного MobileObject, идентифицированного по его идентификатору. Итак, мы добавляем логику filter.

Когда монитор отключается

В этом случае мы также можем использовать takeUntil.

Мы знаем, когда монитор отключается, потому что связанный с ним socket типа SocketObs излучает через socket.onDisconnect() Observable. Итак, что нам нужно сделать, это прекратить посылать информацию о динамике при излучении socket.onDisconnect().

Итак, последняя логика, управляющая завершением Observable, такова:

mobObjInfo.mobObj.dynamicsObs
.pipe(
  tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
  takeUntil(this.stopSendDynamics(socket, mobObjInfo.mobObjId))
)

куда

private stopSendDynamics(socket: SocketObs, mobObjId: string){
  return merge(
            this.mobileObjectRemoved.pipe(
                                       filter(id => id === mobObjId)
                                     ),
            socket.onDisconnect()
  );
}

А вот так выглядит ядро ​​кода, реализующего нашу логику:

Заключение

Это был довольно долгий путь. Мы видели некоторые рассуждения, основанные на реактивном мышлении, и некоторые реализации этих рассуждений.

Мы начали преобразовывать события WebSockets в Observables. Затем, применяя инкрементные преобразования, мы создали один Observable, который после подписки разворачивает все события, которые нам интересны.

На этом этапе добавить побочные эффекты, которые позволяют нам достичь нашей цели, было несложно.

Этот мысленный процесс проектирования, который сам по себе является инкрементальным, и есть то значение, которое я придаю термину «реактивное мышление».

Полную кодовую базу, включающую Контроллер сервера и Монитор, можно найти здесь.