Saya menggunakan perpustakaan yang mengimplementasikan permintaan REST-nya sendiri. Dibutuhkan 'permalink' dan mengeluarkan permintaan REST untuk mengambil Konten (saya memberikan tiruan dalam kode sumber untuk pengujian). Saya memiliki daftar tautan permanen, dan ingin menjadwalkan permintaan untuk masing-masing tautan tersebut dan menghasilkan aliran hasil Konten. Semuanya harus asinkron, dan setelah semua hasil selesai, saya ingin menampilkan daftarnya.
Saya mencoba mencapai ini menggunakan RxJava. Inilah yang saya miliki sekarang (maaf karena tidak menggunakan lambda, saya hanya ingin membiasakan diri dengan nama kelas RxJava):
public class Main {
public static void main(String[] args) {
int count = 10;
List<String> permalinks = new ArrayList<>(count);
for (int i = 1; i <= count; ++i) {
permalinks.add("permalink_" + i);
}
ContentManager cm = new ContentManager();
Observable
.create(new Observable.OnSubscribe<Content>() {
int gotCount = 0;
@Override
public void call(Subscriber<? super Content> subscriber) {
for (String permalink : permalinks) { // 1. is iterating here the correct way?
if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(content);
completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
}
}
@Override
public void onFailure(int code, String message) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
completeIfFinished();
}
}
private void completeIfFinished() {
++gotCount;
if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
subscriber.onCompleted();
}
}
});
}
}
}
})
.toList()
.subscribe(new Action1<List<Content>>() {
@Override
public void call(List<Content> contents) {
System.out.println("list count: " + contents.size());
System.out.println("results: ");
contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
}
});
System.out.println("finishing main");
}
}
// library mocks
class Content {
String basic;
String extended;
public Content(String basic, String extended) {
this.basic = basic;
this.extended = extended;
}
}
interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public void getBasicContentByPermalink(String permalink, RestCallback callback) {
// just to simulate network latency and unordered results
new Thread() {
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
// 95% of the time we succeed
callback.onSuccess(new Content(permalink + "_basic", null));
} else {
callback.onFailure(-1, permalink + "_basic_failure");
}
}
}.start();
}
}
Ini sedikit berhasil, tetapi saya tidak yakin apakah saya melakukan sesuatu dengan cara yang benar. Silakan lihat baris nomor 1-5:
- Saat membuat Observable dari daftar, apakah saya harus mengulangi daftar itu sendiri, atau adakah cara lain yang lebih baik? Misalnya, ada Observable.from(Iterable), tapi saya rasa saya tidak bisa menggunakannya?
- Saya memeriksa isUnsubscribed sebelum saya mengirim permintaan REST, dan juga di kedua penangan hasil (berhasil/gagal). Apakah ini cara yang seharusnya digunakan?
- Saya menerapkan beberapa logika untuk memanggil onComplete ketika semua permintaan telah kembali, tidak peduli apakah berhasil atau gagal (lihat pertanyaan 5). Tapi, karena mereka dilindungi oleh panggilan isUnsubscribed, mungkin saja mereka onComplete tidak pernah dipanggil. Lalu bagaimana aliran itu mengetahui bahwa aliran itu harus selesai? Apakah selesai sebelum waktunya ketika pelanggan berhenti berlangganan dan saya tidak perlu memikirkannya? Apakah ada peringatan? Misalnya, apa jadinya jika pelanggan berhenti berlangganan di tengah-tengah menampilkan hasil Konten? Dalam pengujian saya, daftar semua hasil tidak pernah dikeluarkan, tetapi saya mengharapkan daftar semua hasil hingga saat itu.
- Dalam metode onFailure, saya memancarkan null dengan onNext(null) untuk menandai kegagalan. Saya melakukannya karena pada akhirnya saya akan memiliki zip 2 aliran, dan akan mengeluarkan turunan kelas khusus hanya ketika kedua nilai yang di-zip bukan nol. Apakah ini cara yang benar untuk melakukannya?
- Seperti yang saya sebutkan, saya memiliki logika khusus untuk memeriksa apakah streaming telah selesai. Yang saya lakukan di sini adalah menghitung hasil REST dan ketika diproses sebanyak permalink yang ada di daftar, maka selesai. Apakah ini perlu? Apakah ini cara yang benar?