ฉันใช้ lib reactive-location
กรณีการใช้งานของฉันคือฉันมีวัตถุมากมายที่ปล่อยออกมาจากการสังเกตได้ รายการเหล่านี้สามารถส่งออกได้ทุกสองสามชั่วโมง ทันทีที่มีการปล่อยรายการฉันต้องการรับตำแหน่งและใช้ zipWith (เท่าที่ฉันเข้าใจ) จะปล่อยวัตถุที่มีตำแหน่งนั้น
ปัญหาคือ เนื่องจากวัตถุจะถูกปล่อยออกมาทุกๆ สองสามชั่วโมงเท่านั้น ฉันจึงไม่สามารถรักษาตำแหน่งที่ร้อนจนสังเกตได้เนื่องจากจะทำให้แบตเตอรี่หมด
ดังนั้นฉันต้องการสิ่งต่อไปนี้: เมื่อวัตถุถูกส่งเข้าสู่สตรีม ให้สมัครรับตำแหน่งที่สังเกตได้เมื่อได้รับตำแหน่งแล้ว ยกเลิกการสมัครรับตำแหน่งที่สังเกตได้ จะต้องดำเนินการอย่างต่อเนื่อง
เท่าที่ฉันเข้าใจหม้อแปลงนี้จะดูแลการยกเลิกการสมัคร
public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
final BehaviorSubject subject = BehaviorSubject.create();
Observable source = tObservable.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
subject.onNext(t);
}
});
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
}
};
}
แต่ฉันจะสมัครสมาชิกอีกครั้งได้อย่างไรเมื่อมีออบเจ็กต์ใหม่ถูกส่งลงมาในสตรีม