Подписка и отказ от подписки на каждый отправленный элемент

Я использую библиотеку reactive-location.

Мой вариант использования состоит в том, что у меня есть поток объектов, излучаемых наблюдаемым объектом. Эти элементы могут отправляться каждые несколько часов. Как только элемент испускается, я хочу получить местоположение и с помощью zipWith (насколько я понимаю) генерировать объект, содержащий местоположение.

Проблема в том, что поскольку объекты будут испускаться только раз в несколько часов, я не могу поддерживать наблюдаемое место в горячем состоянии, так как это разряжает батарею.

Поэтому мне нужно следующее: как только объект передается в поток, подписаться на наблюдаемое местоположение после получения местоположения, отказаться от подписки на наблюдаемое местоположение. Это нужно делать постоянно.

Насколько я понимаю, этот трансформер занимается отпиской

public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
    return new Observable.Transformer<T, T>() {

        @Override
        public Observable<T> call(Observable<T> tObservable) {
            final BehaviorSubject subject = BehaviorSubject.create();
            Observable source = tObservable.doOnNext(new Action1<T>() {
                @Override
                public void call(T t) {
                    subject.onNext(t);
                }
            });
            return Observable
                    .merge(source.takeUntil(subject), subject)
                    .take(1);
        }

    };
}

Но как мне подписаться снова, когда новый объект будет отправлен по потоку?


person jiduvah    schedule 22.10.2015    source источник


Ответы (1)


Похоже, что вам нужно объединить исходные элементы с текущим местоположением при их отправке. Здесь не нужно ничего особенного. Просто используйте flatMap() на каждом из исходных элементов, чтобы объединить его с местоположением.

source.flatMap(item ->
        locationProvider
                .getLastKnownLocation()
                .map(location -> new ItemWithLocation<>(item, location))
);

class ItemWithLocation<T> {
    private final T item;
    private final Location location;

    public ItemWithLocation(T item, Location location) {
        this.item = item;
        this.location = location;
    }

    public T getItem() {
        return item;
    }

    public Location getLocation() {
        return location;
    }
}

РЕДАКТИРОВАТЬ: обновлено вторым примером. Следующие будут подписываться на обновления местоположения, пока не будет достигнута определенная точность, а затем объединят их с вашим исходным элементом. Ключевым моментом здесь является использование first(). Его использование позволит отказаться от подписки на провайдера местоположения всякий раз, когда вы получите местоположение, удовлетворяющее вашим потребностям.

LocationRequest request = 
        LocationRequest
            .create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);

source.flatMap(item ->
        locationProvider
                .getUpdatedLocation(request)
                .first(location -> location.getAccuracy() < 5.0f)
                .map(location -> new ItemWithLocation<>(item, location))
);
person kjones    schedule 23.10.2015
comment
Проблема в том, что мне нужно новое место, а не последнее из известных - person jiduvah; 23.10.2015
comment
Какой API вы используете из реактивного местоположения? Это было непонятно, поэтому в моем примере я использовал getLastKnownLocation(). Я обновил ответ вторым примером, который показывает, как обрабатывать отказ от подписки из непрерывного местоположения Observable. - person kjones; 23.10.2015