RxSwift berlangganan elemen terbaru dalam satu urutan mirip dengan gabunganLatest

Misalkan saya memiliki beberapa Observable yang mungkin memiliki rangkaian kejadian yang sangat panjang pada saat saya berlangganan tetapi juga dapat terus mengeluarkan kejadian setelah saya berlangganan. Saya hanya tertarik pada acara-acara tersebut sejak saya berlangganan dan setelahnya. Bagaimana cara mendapatkan acara terbaru?

Dalam contoh ini saya menggunakan ReplaySubject sebagai sumber buatan untuk mengilustrasikan pertanyaan. Dalam praktiknya, hal ini dapat diamati secara sewenang-wenang.

let observable = ReplaySubject<Int>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

_ = observable.subscribe(onNext: {
    print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)

Menghasilkan output: 1 2 3 4 5 6 7

Yang sebenarnya saya inginkan hanyalah acara sejak berlangganan dan seterusnya. yaitu 4 5 6 7

Saya dapat menggunakan mergeLatest dengan beberapa dummy Observable lainnya:

let observable = ReplaySubject<Int>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

_ = Observable.combineLatest(observable, Observable<Int>.just(42)) { value, _ in value }
    .subscribe(onNext: {
    print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)

yang menghasilkan output yang diinginkan 4 5 6 7

Bagaimana saya bisa menghasilkan hasil yang serupa tanpa secara artifisial memperkenalkan Observable lain yang sewenang-wenang?

Saya telah mencoba beberapa hal termasuk menggabungkanLatest dengan array yang hanya terdiri dari satu observasi, tapi itu memancarkan urutan lengkap, bukan hanya yang terbaru. Saya tahu saya bisa menggunakan PublishSubject tetapi saya hanya menggunakan ReplaySubject di sini sebagai ilustrasi.


person Dale    schedule 30.10.2020    source sumber
comment
Secara umum, Observable tidak memiliki peristiwa, melainkan menghasilkan peristiwa. Satu-satunya alasan Anda mengalami masalah yang Anda jelaskan justru karena Anda menggunakan ReplaySubject dan Anda telah mengaturnya untuk menyimpan dan memutar ulang setiap acara setiap kali acara tersebut berlangganan. Anda mengatakan Anda menggunakan ReplaySubject di sini sebagai ilustrasi, tetapi justru alasannya, dan satu-satunya alasan, Anda mengalami masalah yang Anda jelaskan. Jadi jawabannya, jangan gunakan ReplaySubject.createUnbounded() dan masalahnya akan hilang. Entah itu, atau sesuaikan pertanyaannya untuk menunjukkan masalah sebenarnya yang Anda alami.   -  person Daniel T.    schedule 30.10.2020
comment
@DanielT. Saya mengerti maksud Anda, namun demikian, menggabungkanLatest dengan observasi kedua menghasilkan hasil yang diinginkan. Saya hanya ingin tahu mengapa saya tidak dapat menemukan formulasi "terbaru" yang setara untuk satu observasi   -  person Dale    schedule 01.11.2020


Jawaban (2)


Secara default, observable akan memanggil generatornya untuk setiap pelanggan dan mengeluarkan semua nilai yang dihasilkan oleh generator tersebut. Jadi misalnya:

let obs = Observable.create { observer in 
    for each in [1, 2, 3, 5, 7, 11] { 
        observer.onNext(each)
    }
    observer.onCompleted()
}

(Perhatikan bahwa di atas adalah implementasi dari Observable.from(_:))

Setiap kali sesuatu berlangganan obs, penutupan dipanggil dan keenam acara berikutnya akan diterima. Inilah yang dikenal sebagai cold observable, dan sekali lagi ini merupakan perilaku default. Asumsikan Observable dingin kecuali Anda mengetahui sebaliknya.

Ada juga konsep hot observable. Observable panas tidak memanggil fungsi generatornya ketika sesuatu berlangganan padanya.

Berdasarkan pertanyaan Anda, dan komentar Anda selanjutnya, sepertinya Anda ingin tahu cara membuat suhu dingin menjadi panas... Cara mendasarnya adalah dengan memanggil .multicast (atau salah satu operator yang menggunakan implementasinya seperti publish(), replay(_:) atau replayAll().) Ada juga operator tujuan khusus yang disebut .share() yang akan memanaskan observasi dan menjaganya tetap panas sampai semua pelanggan berhenti berlangganan (kemudian akan menjadi dingin lagi.) Dan tentu saja, Subjek dianggap panas karena tidak' tidak memiliki fungsi generator untuk dipanggil.

Namun perlu diperhatikan, bahwa banyak observasi yang memiliki perilaku sinkron, ini berarti bahwa observasi tersebut akan memancarkan semua nilainya segera setelah sesuatu berlangganan dan dengan demikian sudah selesai sebelum pengamat lain (di thread tersebut) memiliki kesempatan untuk berlangganan.

Beberapa contoh lagi... .interval(_:scheduler:) adalah pengamatan dingin dengan perilaku asinkron. Katakanlah Anda memiliki yang berikut ini:

let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
i.subscribe(onNext: { print($0, "from first") })
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    i.subscribe(onNext: { print($0, "from second") })
}

Apa yang akan Anda temukan adalah setiap pengamat akan mendapatkan aliran nilai independennya sendiri (keduanya akan dimulai dengan 0) karena generator di dalam interval dipanggil untuk kedua pengamat. Jadi Anda akan melihat output seperti:

0 from first
1 from first
0 from second
2 from first
1 from second
3 from first
2 from second

Jika Anda melakukan multicast interval, Anda akan melihat perilaku yang berbeda:

let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
    .publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    i.subscribe(onNext: { print($0, "from second") })
}

Hal di atas akan menghasilkan:

0 from first
1 from first
1 from second
2 from first
2 from second
3 from first
3 from second

(Perhatikan bahwa detik dimulai dengan 1, bukan 0.) Operator berbagi akan bekerja dengan cara yang sama dalam kasus ini kecuali Anda tidak perlu memanggil connect() karena operator tersebut melakukannya secara otomatis.

Terakhir, hati-hati. Jika Anda memublikasikan observasi sinkron, Anda mungkin tidak mendapatkan apa yang Anda harapkan:

let i = Observable.from([1, 2, 3, 5])
    .publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
i.subscribe(onNext: { print($0, "from second") })

menghasilkan:

1 from first
2 from first
3 from first
5 from first

Karena kelima peristiwa (empat peristiwa berikutnya dan peristiwa yang selesai) muncul segera setelah connect() dipanggil sebelum pengamat kedua mendapat kesempatan untuk berlangganan.

Artikel yang mungkin membantu Anda adalah Observable Panas dan Dingin tetapi cukup canggih...

person Daniel T.    schedule 01.11.2020

Mengapa tidak menggunakan subjek publikasi seperti ini saja? Bukankah ini hasil yang diinginkan? Publish Subjects hanya memancarkan elemen setelah berlangganan. Dan itulah tujuan keseluruhannya.

    let observable = PublishSubject<Int>()
    observable.onNext(1)
    observable.onNext(2)
    observable.onNext(3)
    observable.onNext(4)

    _ = observable.subscribe(onNext: {
        print($0)
    })
    observable.onNext(5)
    observable.onNext(6)
    observable.onNext(7)
}

Jika Anda tidak ingin menggunakan subjek, Anda dapat membagikan observasi dan menambahkan pelanggan ke-2 seperti ini,

    let observable = ReplaySubject<Int>.createUnbounded()
    observable.onNext(1)
    observable.onNext(2)
    observable.onNext(3)
    observable.onNext(4)
    
    let shared = observable.share()
    
   // this will print full sequence
    shared.subscribe(onNext: {
        print("full sequence: \($0)")
    }).disposed(by: disposeBag)


    // this will only print new events
    shared.subscribe(onNext: {
        print("shared sequence: \($0)")
    }).disposed(by: disposeBag)

    // new events
    observable.onNext(5)
    observable.onNext(6)
    observable.onNext(7)

Observable adalah urutan yang malas dan didorong oleh tarikan. Tanpa langganan pertama Anda, streaming tidak akan dimulai. Setelah dimulai, dengan membagikannya, Anda hanya dapat berlangganan acara baru.

person Rukshan Marapana    schedule 30.10.2020
comment
Ya, tentu saja, dan saya mencatatnya dalam pertanyaan. Tapi bukan itu intinya. Yang dapat diamati di sini hanyalah untuk mengilustrasikan. - person Dale; 01.11.2020
comment
Jadi tidakkah mungkin bagi Anda untuk mengubah implementasi Anda dan menjadikan Observable Anda sebagai PublishSubject? Maka masalahnya akan hilang, bukan? - person Rukshan Marapana; 01.11.2020
comment
atau jika tidak ingin menggunakan subjek bisa share dan gunakan subscriber ke 2. Saya telah memperbarui jawabannya. - person Rukshan Marapana; 01.11.2020