Aliran REST yang Dapat Diamati Khusus dihasilkan dari daftar tautan

Saya menggunakan perpustakaan yang mengimplementasikan permintaan REST-nya sendiri. Dibutuhkan 'permalink' dan mengeluarkan permintaan REST untuk mengambil Konten (saya memberikan tiruan dalam kode sumber untuk pengujian). Saya memiliki daftar tautan permanen, dan ingin menjadwalkan permintaan untuk masing-masing tautan tersebut dan menghasilkan aliran hasil Konten. Semuanya harus asinkron, dan setelah semua hasil selesai, saya ingin menampilkan daftarnya.

Saya mencoba mencapai ini menggunakan RxJava. Inilah yang saya miliki sekarang (maaf karena tidak menggunakan lambda, saya hanya ingin membiasakan diri dengan nama kelas RxJava):

public class Main {

    public static void main(String[] args) {
        int count = 10;
        List<String> permalinks = new ArrayList<>(count);
        for (int i = 1; i <= count; ++i) {
            permalinks.add("permalink_" + i);
        }

        ContentManager cm = new ContentManager();

        Observable
                .create(new Observable.OnSubscribe<Content>() {

                    int gotCount = 0;

                    @Override
                    public void call(Subscriber<? super Content> subscriber) {
                        for (String permalink : permalinks) { // 1. is iterating here the correct way?
                            if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
                                cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                    @Override
                                    public void onSuccess(Content content) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(content);
                                            completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
                                        }
                                    }

                                    @Override
                                    public void onFailure(int code, String message) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
                                            completeIfFinished();
                                        }
                                    }

                                    private void completeIfFinished() {
                                        ++gotCount;
                                        if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
                                            subscriber.onCompleted();
                                        }
                                    }
                                });
                            }
                        }
                    }
                })
                .toList()
                .subscribe(new Action1<List<Content>>() {

                    @Override
                    public void call(List<Content> contents) {
                        System.out.println("list count: " + contents.size());
                        System.out.println("results: ");
                        contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
                    }
                });

        System.out.println("finishing main");
    }
}

// library mocks

class Content {

    String basic;

    String extended;

    public Content(String basic, String extended) {
        this.basic = basic;
        this.extended = extended;
    }
}

interface RestCallback {

    void onSuccess(Content content);

    void onFailure(int code, String message);
}

class ContentManager {

    private final Random random = new Random();

    public void getBasicContentByPermalink(String permalink, RestCallback callback) {
        // just to simulate network latency and unordered results
        new Thread() {

            @Override
            public void run() {
                try {
                    Thread.sleep(random.nextInt(1000) + 200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (random.nextInt(100) < 95) {
                    // 95% of the time we succeed
                    callback.onSuccess(new Content(permalink + "_basic", null));
                } else {
                    callback.onFailure(-1, permalink + "_basic_failure");
                }
            }
        }.start();
    }
}

Ini sedikit berhasil, tetapi saya tidak yakin apakah saya melakukan sesuatu dengan cara yang benar. Silakan lihat baris nomor 1-5:

  1. Saat membuat Observable dari daftar, apakah saya harus mengulangi daftar itu sendiri, atau adakah cara lain yang lebih baik? Misalnya, ada Observable.from(Iterable), tapi saya rasa saya tidak bisa menggunakannya?
  2. Saya memeriksa isUnsubscribed sebelum saya mengirim permintaan REST, dan juga di kedua penangan hasil (berhasil/gagal). Apakah ini cara yang seharusnya digunakan?
  3. Saya menerapkan beberapa logika untuk memanggil onComplete ketika semua permintaan telah kembali, tidak peduli apakah berhasil atau gagal (lihat pertanyaan 5). Tapi, karena mereka dilindungi oleh panggilan isUnsubscribed, mungkin saja mereka onComplete tidak pernah dipanggil. Lalu bagaimana aliran itu mengetahui bahwa aliran itu harus selesai? Apakah selesai sebelum waktunya ketika pelanggan berhenti berlangganan dan saya tidak perlu memikirkannya? Apakah ada peringatan? Misalnya, apa jadinya jika pelanggan berhenti berlangganan di tengah-tengah menampilkan hasil Konten? Dalam pengujian saya, daftar semua hasil tidak pernah dikeluarkan, tetapi saya mengharapkan daftar semua hasil hingga saat itu.
  4. Dalam metode onFailure, saya memancarkan null dengan onNext(null) untuk menandai kegagalan. Saya melakukannya karena pada akhirnya saya akan memiliki zip 2 aliran, dan akan mengeluarkan turunan kelas khusus hanya ketika kedua nilai yang di-zip bukan nol. Apakah ini cara yang benar untuk melakukannya?
  5. Seperti yang saya sebutkan, saya memiliki logika khusus untuk memeriksa apakah streaming telah selesai. Yang saya lakukan di sini adalah menghitung hasil REST dan ketika diproses sebanyak permalink yang ada di daftar, maka selesai. Apakah ini perlu? Apakah ini cara yang benar?

person wujek    schedule 09.06.2015    source sumber


Jawaban (1)


Saat membuat Observable dari daftar, apakah saya harus mengulangi daftar itu sendiri, atau adakah cara lain yang lebih baik? Misalnya, ada Observable.from(Iterable), tapi saya rasa saya tidak bisa menggunakannya?

Daripada mengulangi metode create, Anda dapat membuat observasi per tautan.

public Observable<Content> getContent(permalink) {

    return Observable.create(subscriber -> {
                   ContentManager cm = new ContentManager();
                   if (!subscriber.isUnsubscribed()) { 
                            cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                @Override
                                public void onSuccess(Content content) {
                                        subscriber.onNext(content);
                                        subscriber.onCompleted();
                                }

                                @Override
                                public void onFailure(int code, String message) {
                                       subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(message, permalink));
                                }

                }
    });
}

lalu gunakan dengan operator flatMap

 Observable.from(permalinks)
           .flatMap(link -> getContent(link))
           .subscribe();

Saya memeriksa isUnsubscribed sebelum saya mengirim permintaan REST, dan juga di kedua penangan hasil (berhasil/gagal). Apakah ini cara yang seharusnya digunakan?

periksa isUnsubscribed sebelum Anda mengirim permintaan Anda adalah cara yang harus dilakukan. Tidak yakin untuk panggilan balik sukses/gagal ini (menurut saya tidak ada gunanya tetapi ada yang bisa mengatakan saya salah)

Saya menerapkan beberapa logika untuk memanggil onComplete ketika semua permintaan telah kembali, tidak peduli apakah berhasil atau gagal (lihat pertanyaan 5). Tapi, karena mereka dilindungi oleh panggilan isUnsubscribed, mungkin saja mereka onComplete tidak pernah dipanggil.

Jika Anda Berhenti berlangganan streaming, Anda akan berhenti untuk mengamati streaming tersebut, sehingga Anda mungkin tidak diberi tahu tentang selesainya streaming tersebut.

Misalnya, apa jadinya jika pelanggan berhenti berlangganan di tengah-tengah menampilkan hasil Konten?

Anda akan dapat mengonsumsi apa yang dikeluarkan sebelum berhenti berlangganan.

Dalam metode onFailure, saya memancarkan null dengan onNext(null) untuk menandai kegagalan. Saya melakukannya karena pada akhirnya saya akan memiliki zip 2 aliran, dan akan mengeluarkan turunan kelas khusus hanya ketika kedua nilai yang di-zip bukan nol. Apakah ini cara yang benar untuk melakukannya?

Tidak. alih-alih null, buat kesalahan (menggunakan onError pada pelanggan). Dengan menggunakan ini, Anda tidak perlu mencentang null di zip lambda Anda. Cukup zip!

Seperti yang saya sebutkan, saya memiliki logika khusus untuk memeriksa apakah streaming telah selesai. Yang saya lakukan di sini adalah menghitung hasil REST dan ketika diproses sebanyak permalink yang ada di daftar, maka selesai. Apakah ini perlu? Apakah ini cara yang benar?

Jika Anda ingin mendapatkan nomor permalink, setelah streaming selesai, Anda dapat menggunakan operator count.

Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .count()
          .subscribe(System.out::println);

Jika Anda ingin mengeluarkan nomor tautan, di akhir tautan yang berhasil, Anda dapat mencoba operator scan

 Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .map(content -> 1) // map content to 1 in order to count it.
          .scan((seed, acu) -> acu + 1) 
          .subscribe(System.out::println); // will print 2, 3...
person dwursteisen    schedule 09.06.2015
comment
Saya mencoba dari/flatMap sendiri tetapi saya membuat kesalahan, kode Anda membantu saya melakukannya dengan benar, saya lebih suka cara ini, tidak memerlukan logika khusus saya untuk mendeteksi penyelesaian, terima kasih! Di baris subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(failed), permalink)); Saya mendapatkan: Pengecualian di utas Thread-7 rx.Exceptions.OnErrorNotImplementedException: gagal, dan tidak pernah melihat emisi apa pun dari aliran. Saya tidak berpikir ini adalah cara yang harus dilakukan. Saya tidak akan memanggil onNext dan kemudian memanggil onCompleted, yang akan mengubah tautan permanen menjadi 0 Konten. Saya bahkan tidak memerlukan filter untuk nol. - person wujek; 10.06.2015
comment
'Anda akan dapat mengonsumsi apa yang dipancarkan sebelum berhenti berlangganan.' Ketika saya menerapkan RestCallback#onFailure untuk memanggil subscriber.unsubscribe(), saya tidak pernah melihat emisi apa pun, tidak peduli berapa banyak yang berhasil - saya dapat melihat bahwa beberapa berhasil hanya dengan memasukkan Sysout ke dalam metode onSuccess. Hal ini disebabkan oleh panggilan 'toList'. Ketika saya tidak menunggu seluruh daftar melainkan memproses setiap emisi sendiri-sendiri, saya mendapatkan emisi yang berhasil hingga titik kegagalan. Tapi saya membutuhkannya untuk menjadi daftar. - person wujek; 10.06.2015
comment
Anda HARUS menggunakan onError alih-alih onCompleted untuk memberitahukan kesalahan. Jika Anda ingin mendeteksi kesalahan, Anda perlu mengimplementasikan callback onError : 'Observable.error(new RuntimeException(oups).subscribe(System.out::println, (e) -› System.err.println(ERROR - › + e.getMessage());'. Ini akan memungkinkan Anda mengelola kesalahan dalam aliran Anda (menggunakan metode onErrorResumeNext, ...) - person dwursteisen; 10.06.2015
comment
toList akan mengeluarkan daftar hanya ketika streaming selesai. Karena tidak lengkap, ia tidak mengeluarkan daftar. Jika Anda menginginkan sebagian daftar item yang dikeluarkan, Anda harus mengubah setiap Konten menjadi daftar satu item, lalu menggabungkannya menggunakan pemindaian. itu akan mengeluarkan daftar yang bertambah setiap kali panggilan jarak jauh berhasil. - person dwursteisen; 10.06.2015
comment
Jika terjadi kesalahan, saya hanya ingin mencatat dan tidak mengeluarkan nilai apa pun, dan melanjutkan dengan nilai yang berhasil saya dapatkan. Saya tidak ingin pengecualian dilemparkan dan membatalkan seluruh aliran hanya karena satu permintaan REST gagal. Saat ini, hal ini tampaknya terjadi. Saya harus memeriksa cuplikan baru untuk melihat fungsinya. - person wujek; 10.06.2015
comment
Tangani saja kesalahannya : Observable.from(permalinks) .flatMap(link -› getContent(link).doOnError(e -› System.out.println(Get Exception : + e).onErrorResumeNext(Observable.empty()). Contoh ini memperkecil kesalahan panggilan jaringan ke aliran utama, sehingga aliran akan terus berlanjut dan mengabaikan kesalahan tersebut. - person dwursteisen; 10.06.2015
comment
Bukankah ini (onErrorResumeNext(empty())) hanya menghentikan pengamatan setelah kesalahan pertama terjadi? Saya hanya ingin mencatat dan mengabaikan kesalahan tersebut - mengapa tidak mencatatnya saja di metode RestCallback onFailure() dan tidak mengeluarkan apa pun? Apakah pendekatan ini salah? Ini pasti cukup sederhana... - person wujek; 10.06.2015
comment
Aliran getContent() akan dihentikan. Bukan arus utama. (Harap dicatat bahwa onerrorresumenext ada di aliran getContent()!). Itu cara yang harus dilakukan. Jika tidak: bagaimana cara mengetahui apakah Anda mengalami kesalahan atau streaming telah selesai? - person dwursteisen; 10.06.2015
comment
Ah, kesalahan saya adalah saya memanggil onError() dll. di aliran utama, dari Observable.from(permalinks). Semuanya berfungsi seperti yang Anda katakan setelah dipanggil pada aliran yang benar. Sekarang saya mengerti, terima kasih. - person wujek; 10.06.2015
comment
Tentang pertanyaan isUnsubscribed() - jika saya tidak membungkus subscriber.onNext/onCompeted dll. di dalamnya setelah balasannya kembali, saya mendapatkan java.lang.IllegalStateException: This instance has had unsubscribed and the queue is no more usable. ketika saya menggunakan take(3), misalnya. Jadi menurut saya sangat penting untuk memeriksa status sebelum memanggil metode pelanggan, dan memeriksanya sebelum mengirim permintaan adalah hal yang menyenangkan untuk dilakukan. - person wujek; 11.06.2015