สตรีมผลลัพธ์ REST ที่สังเกตได้แบบกำหนดเองจากรายการลิงก์

ฉันกำลังใช้ไลบรารีที่ใช้คำขอ REST ของตัวเอง ใช้ 'ลิงก์ถาวร' และออกคำขอ REST เพื่อดึงเนื้อหา (ฉันจัดเตรียมการเยาะเย้ยในซอร์สโค้ดสำหรับการทดสอบ) ฉันมีรายการลิงก์ถาวร และต้องการกำหนดเวลาคำขอสำหรับลิงก์แต่ละรายการและสร้างสตรีมผลลัพธ์เนื้อหา ทั้งหมดควรเป็นแบบอะซิงโครนัส และเมื่อผลลัพธ์ทั้งหมดเสร็จสิ้น ฉันต้องการปล่อยรายการผลลัพธ์เหล่านั้น

ฉันกำลังพยายามบรรลุเป้าหมายนี้โดยใช้ RxJava นี่คือสิ่งที่ฉันมีตอนนี้ (ขออภัยที่ไม่ได้ใช้ lambdas ฉันแค่อยากทำความคุ้นเคยกับชื่อคลาส RxJava):

public class Main {

    public static void main(String[] args) {
        int count = 10;
        List<String> permalinks = new ArrayList<>(count);
        for (int i = 1; i <= count; ++i) {
            permalinks.add("permalink_" + i);
        }

        ContentManager cm = new ContentManager();

        Observable
                .create(new Observable.OnSubscribe<Content>() {

                    int gotCount = 0;

                    @Override
                    public void call(Subscriber<? super Content> subscriber) {
                        for (String permalink : permalinks) { // 1. is iterating here the correct way?
                            if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
                                cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                    @Override
                                    public void onSuccess(Content content) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(content);
                                            completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
                                        }
                                    }

                                    @Override
                                    public void onFailure(int code, String message) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
                                            completeIfFinished();
                                        }
                                    }

                                    private void completeIfFinished() {
                                        ++gotCount;
                                        if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
                                            subscriber.onCompleted();
                                        }
                                    }
                                });
                            }
                        }
                    }
                })
                .toList()
                .subscribe(new Action1<List<Content>>() {

                    @Override
                    public void call(List<Content> contents) {
                        System.out.println("list count: " + contents.size());
                        System.out.println("results: ");
                        contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
                    }
                });

        System.out.println("finishing main");
    }
}

// library mocks

class Content {

    String basic;

    String extended;

    public Content(String basic, String extended) {
        this.basic = basic;
        this.extended = extended;
    }
}

interface RestCallback {

    void onSuccess(Content content);

    void onFailure(int code, String message);
}

class ContentManager {

    private final Random random = new Random();

    public void getBasicContentByPermalink(String permalink, RestCallback callback) {
        // just to simulate network latency and unordered results
        new Thread() {

            @Override
            public void run() {
                try {
                    Thread.sleep(random.nextInt(1000) + 200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (random.nextInt(100) < 95) {
                    // 95% of the time we succeed
                    callback.onSuccess(new Content(permalink + "_basic", null));
                } else {
                    callback.onFailure(-1, permalink + "_basic_failure");
                }
            }
        }.start();
    }
}

ใช้งานได้นิดหน่อย แต่ฉันไม่แน่ใจว่าฉันทำสิ่งที่ถูกต้องหรือไม่ โปรดดูบรรทัดที่ 1-5:

  1. เมื่อสร้าง Observable จากรายการ ฉันควรจะวนซ้ำรายการด้วยตัวเอง หรือมีวิธีอื่นที่ดีกว่านี้หรือไม่ ตัวอย่างเช่นมี Observable.from(Iterable) แต่ฉันไม่คิดว่าจะใช้งานได้ใช่ไหม
  2. ฉันกำลังตรวจสอบ isUnsubscribed ก่อนที่จะส่งคำขอ REST และในตัวจัดการผลลัพธ์ทั้งสองด้วย (สำเร็จ/ล้มเหลว) นี่คือวิธีที่ควรจะใช้ใช่ไหม?
  3. ฉันใช้ตรรกะบางอย่างเพื่อเรียก onComplete เมื่อคำขอทั้งหมดกลับมา ไม่ว่าจะสำเร็จหรือล้มเหลว (ดูคำถามที่ 5) แต่เนื่องจากพวกเขาได้รับการปกป้องโดยการเรียก isUnsubscribed จึงอาจเกิดขึ้นได้ว่าการโทร onComplete จะไม่ถูกเรียกเลย กระแสรู้ได้อย่างไรว่าควรจะเสร็จสิ้น? จะเสร็จสิ้นก่อนกำหนดหรือไม่เมื่อสมาชิกยกเลิกการสมัครและฉันไม่ต้องคิดเกี่ยวกับเรื่องนี้? มีข้อแม้หรือไม่? ตัวอย่างเช่น จะเกิดอะไรขึ้นหากผู้สมัครสมาชิกยกเลิกการสมัครรับข้อมูลในขณะที่ปล่อยผลลัพธ์เนื้อหาออกมา ในการทดสอบของฉัน รายการผลลัพธ์ทั้งหมดจะไม่ถูกปล่อยออกมา แต่ฉันคาดหวังว่ารายการผลลัพธ์ทั้งหมดจนถึงจุดนั้น
  4. ในเมธอด onFailure ฉันส่งค่า null ด้วย onNext(null) เพื่อทำเครื่องหมายความล้มเหลว ฉันทำเพราะท้ายที่สุดแล้ว ฉันจะมี zip 2 สตรีม และจะปล่อยอินสแตนซ์ของคลาสที่กำหนดเองเฉพาะเมื่อค่า zip ทั้งสองไม่เป็นค่าว่าง นี่เป็นวิธีที่ถูกต้องหรือไม่?
  5. ดังที่ได้กล่าวไปแล้ว ฉันมีตรรกะที่กำหนดเองเพื่อตรวจสอบว่าการสตรีมเสร็จสิ้นหรือไม่ สิ่งที่ฉันทำที่นี่คือนับผลลัพธ์ REST และเมื่อประมวลผลได้มากเท่าที่มีลิงก์ถาวรในรายการ ก็เป็นอันเสร็จสิ้น สิ่งนี้จำเป็นหรือไม่? นี่เป็นวิธีที่ถูกต้องหรือไม่?

person wujek    schedule 09.06.2015    source แหล่งที่มา


คำตอบ (1)


เมื่อสร้าง Observable จากรายการ ฉันควรจะวนซ้ำรายการด้วยตัวเอง หรือมีวิธีอื่นที่ดีกว่านี้หรือไม่ ตัวอย่างเช่นมี Observable.from(Iterable) แต่ฉันไม่คิดว่าจะใช้งานได้ใช่ไหม

แทนที่จะวนซ้ำในเมธอด create คุณสามารถสร้างสิ่งที่สังเกตได้ต่อลิงก์

public Observable<Content> getContent(permalink) {

    return Observable.create(subscriber -> {
                   ContentManager cm = new ContentManager();
                   if (!subscriber.isUnsubscribed()) { 
                            cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                @Override
                                public void onSuccess(Content content) {
                                        subscriber.onNext(content);
                                        subscriber.onCompleted();
                                }

                                @Override
                                public void onFailure(int code, String message) {
                                       subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(message, permalink));
                                }

                }
    });
}

จากนั้นใช้กับตัวดำเนินการ flatMap

 Observable.from(permalinks)
           .flatMap(link -> getContent(link))
           .subscribe();

ฉันกำลังตรวจสอบ isUnsubscribed ก่อนที่จะส่งคำขอ REST และในตัวจัดการผลลัพธ์ทั้งสองด้วย (สำเร็จ/ล้มเหลว) นี่คือวิธีที่ควรจะใช้ใช่ไหม?

ตรวจสอบ isUnsubscribed ก่อนที่คุณจะส่งคำขอของคุณเป็นวิธีที่ควรทำ ไม่แน่ใจสำหรับการโทรกลับสำเร็จ/ล้มเหลวนี้ (ฉันคิดว่ามันไม่มีประโยชน์ แต่มีคนบอกว่าฉันผิด)

ฉันใช้ตรรกะบางอย่างเพื่อเรียก onComplete เมื่อคำขอทั้งหมดกลับมา ไม่ว่าจะสำเร็จหรือล้มเหลว (ดูคำถามที่ 5) แต่เนื่องจากพวกเขาได้รับการปกป้องโดยการเรียก isUnsubscribed จึงอาจเกิดขึ้นได้ว่าการโทร onComplete จะไม่ถูกเรียกเลย

หากคุณยกเลิกการสมัครรับข้อมูลสตรีม คุณจะหยุดดูสตรีม ดังนั้นคุณอาจไม่ได้รับแจ้งเมื่อสตรีมเสร็จสิ้น

ตัวอย่างเช่น จะเกิดอะไรขึ้นหากผู้สมัครสมาชิกยกเลิกการสมัครรับข้อมูลในขณะที่ปล่อยผลลัพธ์เนื้อหาออกมา

คุณจะสามารถใช้สิ่งที่ปล่อยออกมาก่อนที่จะยกเลิกการสมัคร

ในเมธอด onFailure ฉันส่งค่า null ด้วย onNext(null) เพื่อทำเครื่องหมายความล้มเหลว ฉันทำเพราะท้ายที่สุดแล้ว ฉันจะมี zip 2 สตรีม และจะปล่อยอินสแตนซ์ของคลาสที่กำหนดเองเฉพาะเมื่อค่า zip ทั้งสองไม่เป็นค่าว่าง นี่เป็นวิธีที่ถูกต้องหรือไม่?

ไม่. แทนที่จะเป็น null ให้ปล่อยข้อผิดพลาด (โดยใช้ onError บนสมาชิก) เมื่อใช้สิ่งนี้ คุณจะไม่ต้องตรวจสอบค่าว่างใน zip lambda ของคุณ เพียงแค่ซิป!

ดังที่ได้กล่าวไปแล้ว ฉันมีตรรกะที่กำหนดเองเพื่อตรวจสอบว่าการสตรีมเสร็จสิ้นหรือไม่ สิ่งที่ฉันทำที่นี่คือนับผลลัพธ์ REST และเมื่อประมวลผลได้มากเท่าที่มีลิงก์ถาวรในรายการ ก็เป็นอันเสร็จสิ้น สิ่งนี้จำเป็นหรือไม่? นี่เป็นวิธีที่ถูกต้องหรือไม่?

หากคุณต้องการรับจำนวนลิงก์ถาวร เมื่อสตรีมเสร็จสิ้น คุณสามารถใช้โอเปอเรเตอร์ count ได้

Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .count()
          .subscribe(System.out::println);

หากคุณต้องการปล่อยจำนวนลิงก์ ในตอนท้ายของลิงก์ที่สำเร็จ คุณสามารถลองใช้ตัวดำเนินการ scan

 Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .map(content -> 1) // map content to 1 in order to count it.
          .scan((seed, acu) -> acu + 1) 
          .subscribe(System.out::println); // will print 2, 3...
person dwursteisen    schedule 09.06.2015
comment
ฉันลองจาก/flatMap ด้วยตัวเอง แต่ฉันทำผิดพลาด รหัสของคุณช่วยให้ฉันทำได้อย่างถูกต้อง ฉันชอบวิธีนี้ ไม่จำเป็นต้องใช้ตรรกะที่กำหนดเองในการตรวจจับความสมบูรณ์ ขอบคุณ! ในบรรทัด Subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(failed), permalink)); ฉันได้รับ: ข้อยกเว้นในเธรด Thread-7 rx.Exceptions.OnErrorNotImplementedException: failed และไม่เคยเห็นการปล่อยก๊าซใดๆ จากสตรีมเลย ฉันไม่ผอมนี่คือวิธีที่จะไป ฉันจะไม่เรียก onNext แล้วเรียก onCompleted ซึ่งจะแปลงลิงก์ถาวรเป็น 0 Contents ฉันไม่ต้องการตัวกรองสำหรับค่าว่างด้วยซ้ำ - person wujek; 10.06.2015
comment
'คุณจะสามารถบริโภคสิ่งที่ปล่อยออกมาก่อนที่จะยกเลิกการสมัคร' เมื่อฉันใช้ RestCallback#onFailure เพื่อเรียก Subscriber.unsubscribe() ฉันไม่เคยเห็นการปล่อยก๊าซใดๆ เลย ไม่ว่าจะสำเร็จไปกี่ครั้งก็ตาม - ฉันสามารถเห็นได้ว่าบางส่วนประสบความสำเร็จเพียงแค่ใส่ Sysout ลงในเมธอด onSuccess สิ่งนี้เกิดจากการเรียก 'toList' เมื่อฉันไม่รอรายการทั้งหมดแต่ต้องประมวลผลการปล่อยก๊าซแต่ละรายการด้วยตัวเอง ฉันจะจัดการการปล่อยก๊าซที่ประสบความสำเร็จจนถึงจุดที่ล้มเหลว แต่ฉันต้องการให้เป็นรายการ - person wujek; 10.06.2015
comment
คุณควรใช้ onError แทน onCompleted เพื่อแจ้งข้อผิดพลาด หากคุณต้องการตรวจพบข้อผิดพลาด คุณจะต้องดำเนินการโทรกลับ onError : 'Observable.error(new RuntimeException(oups).subscribe(System.out::println, (e) -› System.err.println(ERROR - › + e.getMessage());' คุณจะสามารถจัดการข้อผิดพลาดในการสตรีมของคุณได้ (โดยใช้วิธี onErrorResumeNext ...) - person dwursteisen; 10.06.2015
comment
toList จะส่งรายการเฉพาะเมื่อการสตรีมเสร็จสิ้นเท่านั้น เนื่องจากไม่เสร็จสมบูรณ์ จึงไม่ปล่อยรายการ หากคุณต้องการรายการบางส่วนที่ปล่อยออกมา คุณจะต้องแปลงเนื้อหาแต่ละรายการให้เป็นรายการเดียว จากนั้นจึงรวมเข้าด้วยกันโดยใช้การสแกน มันจะปล่อยรายการที่เพิ่มขึ้นทุกครั้งที่มีการโทรระยะไกลสำเร็จ - person dwursteisen; 10.06.2015
comment
ในกรณีที่เกิดข้อผิดพลาด ฉันเพียงต้องการบันทึกและไม่ปล่อยค่าใด ๆ และดำเนินการกับสิ่งที่ฉันทำได้สำเร็จ ฉันไม่ต้องการให้เกิดข้อยกเว้นและยกเลิกสตรีมทั้งหมดเพียงเพราะคำขอ REST หนึ่งรายการล้มเหลว ปัจจุบันดูเหมือนว่าจะเป็นเช่นนั้น ฉันจะต้องตรวจสอบตัวอย่างใหม่เพื่อดูว่ามันทำอะไร - person wujek; 10.06.2015
comment
เพียงจัดการกับข้อผิดพลาดแล้ว: Observable.from(permalinks) .flatMap(link -› getContent(link).doOnError(e -› System.out.println(Get Exception : + e).onErrorResumeNext(Observable.empty()) ตัวอย่างนี้แสดงให้เห็นข้อผิดพลาดของการเรียกเครือข่ายไปยังสตรีมหลัก ดังนั้น สตรีมจะดำเนินต่อไปและไม่สนใจข้อผิดพลาด - person dwursteisen; 10.06.2015
comment
สิ่งนี้ (onErrorResumeNext(empty())) ไม่เพียงแค่หยุดสิ่งที่สังเกตได้หลังจากเกิดข้อผิดพลาดครั้งแรกใช่หรือไม่ ฉันแค่อยากจะบันทึกและเพิกเฉยต่อข้อผิดพลาด - ทำไมไม่เพียงแค่บันทึกในเมธอด RestCallback onFailure() และไม่ปล่อยอะไรเลย แนวทางนี้ผิดหรือเปล่า? มันง่ายพอแน่นอน... - person wujek; 10.06.2015
comment
สตรีม getContent() จะหยุดทำงาน ไม่ใช่กระแสหลัก. (โปรดทราบว่า onerrorresumenext อยู่ในสตรีม getContent() !) มันคือวิธีการทำ มิฉะนั้น: จะทราบได้อย่างไรว่าคุณพบข้อผิดพลาดหรือสตรีมเสร็จสมบูรณ์หรือไม่ - person dwursteisen; 10.06.2015
comment
อ่า ความผิดพลาดของฉันคือฉันกำลังเรียก onError() ฯลฯ บนสตรีมหลักจาก Observable.from(permalinks) ทุกอย่างทำงานได้ตามที่คุณบอกว่าเคยเรียกใช้สตรีมที่ถูกต้อง ตอนนี้ฉันเข้าใจแล้ว ขอบคุณ - person wujek; 10.06.2015
comment
เกี่ยวกับคำถาม isUnsubscribed() - หากฉันไม่รวม Subscriber.onNext/onCompeted ฯลฯ ไว้ในนั้นหลังจากการตอบกลับกลับมา ฉันจะได้รับ java.lang.IllegalStateException: อินสแตนซ์นี้ถูกยกเลิกการสมัครแล้ว และคิวไม่สามารถใช้งานได้อีกต่อไป เมื่อฉันใช้ take(3) เป็นต้น ดังนั้นฉันเดาว่ามันจำเป็นจริงๆ ที่จะต้องตรวจสอบสถานะก่อนเรียกใช้วิธีการสมัครสมาชิก และการตรวจสอบก่อนส่งคำขอก็เป็นสิ่งที่ดี - person wujek; 11.06.2015