Module: Enumerable
- Included in:
- RxRuby::CompositeSubscription
- Defined in:
- lib/core_ext/enumerable.rb
Instance Method Summary collapse
- #subscribe(observer, scheduler = RxRuby::ImmediateScheduler.instance) ⇒ Object
- #to_observable(scheduler = RxRuby::ImmediateScheduler.instance) ⇒ Object
Instance Method Details
#subscribe(observer, scheduler = RxRuby::ImmediateScheduler.instance) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# File 'lib/core_ext/enumerable.rb', line 2 def subscribe(observer, scheduler = RxRuby::ImmediateScheduler.instance) begin self.each do |e| scheduler.schedule lambda { observer.on_next(e) } end rescue => ex observer.on_error(ex) return end observer.on_completed end |
#to_observable(scheduler = RxRuby::ImmediateScheduler.instance) ⇒ Object
17 18 19 20 21 |
# File 'lib/core_ext/enumerable.rb', line 17 def to_observable(scheduler = RxRuby::ImmediateScheduler.instance) RxRuby::AnonymousObservable.new do |observer| self.subscribe(observer, scheduler) end end |