Пользовательский наблюдаемый поток результатов REST из списка ссылок

Я использую библиотеку, которая реализует собственные запросы REST. Требуется «постоянная ссылка» и выдается запрос REST для получения контента (я предоставляю имитацию в исходном коде для тестирования). У меня есть список постоянных ссылок, и я хотел бы запланировать запрос для каждой и сгенерировать поток результатов контента. Все это должно быть асинхронным, и как только все результаты будут получены, я хочу выпустить их список.

Я пытаюсь добиться этого с помощью RxJava. Вот что у меня есть сейчас (извините, что не использую лямбды, я просто хочу привыкнуть к именам классов RxJava):

public class Main {

    public static void main(String[] args) {
        int count = 10;
        List<String> permalinks = new ArrayList<>(count);
        for (int i = 1; i <= count; ++i) {
            permalinks.add("permalink_" + i);
        }

        ContentManager cm = new ContentManager();

        Observable
                .create(new Observable.OnSubscribe<Content>() {

                    int gotCount = 0;

                    @Override
                    public void call(Subscriber<? super Content> subscriber) {
                        for (String permalink : permalinks) { // 1. is iterating here the correct way?
                            if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
                                cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                    @Override
                                    public void onSuccess(Content content) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(content);
                                            completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
                                        }
                                    }

                                    @Override
                                    public void onFailure(int code, String message) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
                                            completeIfFinished();
                                        }
                                    }

                                    private void completeIfFinished() {
                                        ++gotCount;
                                        if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
                                            subscriber.onCompleted();
                                        }
                                    }
                                });
                            }
                        }
                    }
                })
                .toList()
                .subscribe(new Action1<List<Content>>() {

                    @Override
                    public void call(List<Content> contents) {
                        System.out.println("list count: " + contents.size());
                        System.out.println("results: ");
                        contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
                    }
                });

        System.out.println("finishing main");
    }
}

// library mocks

class Content {

    String basic;

    String extended;

    public Content(String basic, String extended) {
        this.basic = basic;
        this.extended = extended;
    }
}

interface RestCallback {

    void onSuccess(Content content);

    void onFailure(int code, String message);
}

class ContentManager {

    private final Random random = new Random();

    public void getBasicContentByPermalink(String permalink, RestCallback callback) {
        // just to simulate network latency and unordered results
        new Thread() {

            @Override
            public void run() {
                try {
                    Thread.sleep(random.nextInt(1000) + 200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (random.nextInt(100) < 95) {
                    // 95% of the time we succeed
                    callback.onSuccess(new Content(permalink + "_basic", null));
                } else {
                    callback.onFailure(-1, permalink + "_basic_failure");
                }
            }
        }.start();
    }
}

Это вроде работает, но я не уверен, что все делаю правильно. Обратите внимание на строки 1-5:

  1. При создании Observable из списка должен ли я сам перебирать список или есть другой, лучший способ? Например, есть Observable.from (Iterable), но я не думаю, что смогу его использовать?
  2. Я проверяю isUnsubscribed перед отправкой запроса REST, а также в обоих обработчиках результатов (успех / неудача). Это как предполагается использовать?
  3. Я реализую некоторую логику для вызова onComplete, когда все запросы вернутся, независимо от того, были ли они успешными или неудачными (см. Вопрос 5). Но, поскольку они защищены вызовом isUnsubscribed, может случиться так, что они никогда не вызывают onComplete. Как тогда поток узнает, что он должен завершиться? Завершается ли она преждевременно, когда подписчик отписывается, и мне не нужно об этом думать? Есть ли какие-нибудь предостережения? Например, что произойдет, если подписчик отменит подписку в середине выдачи результатов Content? В моих тестах список всех результатов никогда не выводится, но я ожидал бы получить список всех результатов до этого момента.
  4. В методе onFailure я отправляю null с onNext (null), чтобы отметить сбой. Я делаю это, потому что в итоге у меня будет zip из 2 потоков, и я создам экземпляр настраиваемого класса только тогда, когда оба zip-значения не равны нулю. Это правильный способ сделать это?
  5. Как я уже упоминал, у меня есть некоторая настраиваемая логика для проверки завершения потока. Что я здесь делаю, так это подсчитываю результаты REST, и когда было обработано столько же, сколько постоянных ссылок в списке, это делается. Это необходимо? Это правильный путь?

person wujek    schedule 09.06.2015    source источник


Ответы (1)


При создании Observable из списка должен ли я сам перебирать список или есть другой, лучший способ? Например, есть Observable.from (Iterable), но я не думаю, что смогу его использовать?

Вместо того, чтобы повторять метод create, вы можете создать наблюдаемый объект для каждой ссылки.

public Observable<Content> getContent(permalink) {

    return Observable.create(subscriber -> {
                   ContentManager cm = new ContentManager();
                   if (!subscriber.isUnsubscribed()) { 
                            cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                @Override
                                public void onSuccess(Content content) {
                                        subscriber.onNext(content);
                                        subscriber.onCompleted();
                                }

                                @Override
                                public void onFailure(int code, String message) {
                                       subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(message, permalink));
                                }

                }
    });
}

затем используйте его с оператором flatMap

 Observable.from(permalinks)
           .flatMap(link -> getContent(link))
           .subscribe();

Я проверяю isUnsubscribed перед отправкой запроса REST, а также в обоих обработчиках результатов (успех / неудача). Это как предполагается использовать?

проверьте isUnsubscribed перед отправкой запроса. Не уверен в этом обратном вызове успеха / неудачи (я думаю, что это бесполезно, но кто-то может сказать, что я ошибаюсь)

Я реализую некоторую логику для вызова onComplete, когда все запросы вернутся, независимо от того, были ли они успешными или неудачными (см. Вопрос 5). Но, поскольку они защищены вызовом isUnsubscribed, может случиться так, что они никогда не вызывают onComplete.

Если вы откажетесь от подписки на поток, вы перестанете наблюдать за потоком, поэтому вы можете не получить уведомление о завершении потока.

Например, что произойдет, если подписчик отменит подписку в середине выдачи результатов Content?

вы сможете использовать то, что было отправлено до отказа от подписки.

В методе onFailure я отправляю null с onNext (null), чтобы отметить сбой. Я делаю это, потому что в итоге у меня будет zip из 2 потоков, и я создам экземпляр настраиваемого класса только тогда, когда оба zip-значения не равны нулю. Это правильный способ сделать это?

Неа. вместо нуля выдать ошибку (используя onError на подписчике). Используя это, вам не нужно будет проверять null в вашей лямбда-выражении zip. Просто застегните!

Как я уже упоминал, у меня есть некоторая настраиваемая логика для проверки завершения потока. Что я здесь делаю, так это подсчитываю результаты REST, и когда было обработано столько же, сколько постоянных ссылок в списке, это делается. Это необходимо? Это правильный путь?

Если вы хотите получить количество постоянных ссылок, когда поток завершится, вы можете использовать оператор count.

Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .count()
          .subscribe(System.out::println);

Если вы хотите выдать номер ссылки, в конце успешной ссылки вы можете попробовать оператор scan

 Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .map(content -> 1) // map content to 1 in order to count it.
          .scan((seed, acu) -> acu + 1) 
          .subscribe(System.out::println); // will print 2, 3...
person dwursteisen    schedule 09.06.2015
comment
Я сам пробовал использовать / flatMap, но сделал ошибку, ваш код помог мне сделать это правильно, я предпочитаю этот способ, мне не нужна моя собственная логика для обнаружения завершения, спасибо! В строке subscriber.onError (OnErrorThrowable.addValueAsLastCause (new RuntimeException (не удалось), постоянная ссылка)); Я получаю: Исключение в потоке Thread-7 rx.exceptions.OnErrorNotImplementedException: не удалось и никогда не вижу никаких выбросов из потока. Я не думаю, что это путь. Я просто не буду вызывать onNext, а затем вызывать onCompleted, который преобразует постоянную ссылку в 0 Contents. Мне даже фильтр на нули не понадобится. - person wujek; 10.06.2015
comment
«вы сможете использовать то, что было отправлено до отказа от подписки». Когда я реализую свой RestCallback # onFailure для вызова subscriber.unsubscribe (), я никогда не вижу никаких выбросов, независимо от того, сколько из них было выполнено - я вижу, что некоторые из них преуспели, просто поместив Sysout в метод onSuccess. Это вызвано вызовом toList. Когда я не жду весь список, а обрабатываю каждое излучение отдельно, я получаю успешные до точки отказа. Но мне нужно, чтобы это был список. - person wujek; 10.06.2015
comment
вы ДОЛЖНЫ использовать onError вместо onCompleted для уведомления об ошибке. Если вы хотите обнаружить ошибку, вы должны реализовать обратный вызов onError: 'Observable.error (new RuntimeException (oups) .subscribe (System.out :: println, (e) - ›System.err.println (ERROR - ›+ E.getMessage ()); '. Это позволит вам управлять ошибками в своем потоке (используя метод onErrorResumeNext, ...) - person dwursteisen; 10.06.2015
comment
toList выдаст список только тогда, когда поток будет завершен. Поскольку он не завершен, он не выдает список. Если вам нужен частичный список отправленных элементов, вам придется преобразовать каждый контент в список из одного элемента, а затем объединить его с помощью сканирования. он будет выдавать список, который будет расти каждый раз при успешном удалении вызова. - person dwursteisen; 10.06.2015
comment
В случае возникновения ошибки я просто хочу войти в журнал и не выдавать никакого значения и продолжить те, которые я успешно получил. Я не хочу, чтобы генерировалось исключение и отменялся весь поток только из-за сбоя одного запроса REST. В настоящее время, похоже, это так. Мне нужно будет проверить новый фрагмент, чтобы увидеть, что он делает. - person wujek; 10.06.2015
comment
Тогда просто разберитесь с ошибками: Observable.from (постоянные ссылки) .flatMap (ссылка - ›getContent (ссылка) .doOnError (e -› System.out.println (Get Exception: + e) ​​.onErrorResumeNext (Observable.empty ()). В этом примере мелкая ошибка сетевого вызова основного потока, поэтому поток будет продолжаться и игнорировать ошибку. - person dwursteisen; 10.06.2015
comment
Разве это (onErrorResumeNext (empty ())) просто не останавливает наблюдаемое после возникновения первой ошибки? Я просто хочу зарегистрировать и в противном случае игнорировать ошибку - почему бы просто не зарегистрировать ее в методе RestCallback onFailure () и ничего не выдать? Это неправильный подход? Это определенно достаточно просто ... - person wujek; 10.06.2015
comment
Поток getContent () будет остановлен. Не основной поток. (Обратите внимание, что onerrorresumenext находится в потоке getContent ()!). Это способ сделать. В противном случае: как узнать, возникла ли у вас ошибка или поток завершен? - person dwursteisen; 10.06.2015
comment
Ах, моя ошибка заключалась в том, что я вызывал onError () и т. Д. В основном потоке из Observable.from (постоянные ссылки). Все работает так, как вы говорите, после вызова правильных потоков. Теперь я понял, спасибо. - person wujek; 10.06.2015
comment
По поводу вопросов isUnsubscribed () - если я не заключу в него subscriber.onNext / onCompeted и т. Д. После того, как ответ вернется, я получу java.lang.IllegalStateException: этот экземпляр был отменен, и очередь больше не используется. когда я, например, использую take (3). Поэтому я думаю, что действительно необходимо проверять статус перед вызовом методов подписчика, и проверка его перед отправкой запроса - это просто хорошо. - person wujek; 11.06.2015