Я использую библиотеку, которая реализует собственные запросы REST. Требуется «постоянная ссылка» и выдается запрос REST для получения контента (я предоставляю имитацию в исходном коде для тестирования). У меня есть список постоянных ссылок, и я хотел бы запланировать запрос для каждой и сгенерировать поток результатов контента. Все это должно быть асинхронным, и как только все результаты будут получены, я хочу выпустить их список.
Я пытаюсь добиться этого с помощью RxJava. Вот что у меня есть сейчас (извините, что не использую лямбды, я просто хочу привыкнуть к именам классов RxJava):
public class Main {
public static void main(String[] args) {
int count = 10;
List<String> permalinks = new ArrayList<>(count);
for (int i = 1; i <= count; ++i) {
permalinks.add("permalink_" + i);
}
ContentManager cm = new ContentManager();
Observable
.create(new Observable.OnSubscribe<Content>() {
int gotCount = 0;
@Override
public void call(Subscriber<? super Content> subscriber) {
for (String permalink : permalinks) { // 1. is iterating here the correct way?
if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(content);
completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
}
}
@Override
public void onFailure(int code, String message) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
completeIfFinished();
}
}
private void completeIfFinished() {
++gotCount;
if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
subscriber.onCompleted();
}
}
});
}
}
}
})
.toList()
.subscribe(new Action1<List<Content>>() {
@Override
public void call(List<Content> contents) {
System.out.println("list count: " + contents.size());
System.out.println("results: ");
contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
}
});
System.out.println("finishing main");
}
}
// library mocks
class Content {
String basic;
String extended;
public Content(String basic, String extended) {
this.basic = basic;
this.extended = extended;
}
}
interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public void getBasicContentByPermalink(String permalink, RestCallback callback) {
// just to simulate network latency and unordered results
new Thread() {
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
// 95% of the time we succeed
callback.onSuccess(new Content(permalink + "_basic", null));
} else {
callback.onFailure(-1, permalink + "_basic_failure");
}
}
}.start();
}
}
Это вроде работает, но я не уверен, что все делаю правильно. Обратите внимание на строки 1-5:
- При создании Observable из списка должен ли я сам перебирать список или есть другой, лучший способ? Например, есть Observable.from (Iterable), но я не думаю, что смогу его использовать?
- Я проверяю isUnsubscribed перед отправкой запроса REST, а также в обоих обработчиках результатов (успех / неудача). Это как предполагается использовать?
- Я реализую некоторую логику для вызова onComplete, когда все запросы вернутся, независимо от того, были ли они успешными или неудачными (см. Вопрос 5). Но, поскольку они защищены вызовом isUnsubscribed, может случиться так, что они никогда не вызывают onComplete. Как тогда поток узнает, что он должен завершиться? Завершается ли она преждевременно, когда подписчик отписывается, и мне не нужно об этом думать? Есть ли какие-нибудь предостережения? Например, что произойдет, если подписчик отменит подписку в середине выдачи результатов Content? В моих тестах список всех результатов никогда не выводится, но я ожидал бы получить список всех результатов до этого момента.
- В методе onFailure я отправляю null с onNext (null), чтобы отметить сбой. Я делаю это, потому что в итоге у меня будет zip из 2 потоков, и я создам экземпляр настраиваемого класса только тогда, когда оба zip-значения не равны нулю. Это правильный способ сделать это?
- Как я уже упоминал, у меня есть некоторая настраиваемая логика для проверки завершения потока. Что я здесь делаю, так это подсчитываю результаты REST, и когда было обработано столько же, сколько постоянных ссылок в списке, это делается. Это необходимо? Это правильный путь?