Module: Rx::Observable

Included in:
AnonymousObservable, AsyncSubject, BehaviorSubject, HotObservable, ReplaySubject, Subject, Subject::AnonymousSubject
Defined in:
lib/rx/operators/time.rb,
lib/rx/internal/util.rb,
lib/rx/core/observable.rb,
lib/rx/operators/single.rb,
lib/rx/linq/observable/do.rb,
lib/rx/operators/creation.rb,
lib/rx/operators/multiple.rb,
lib/rx/linq/observable/and.rb,
lib/rx/operators/aggregates.rb,
lib/rx/linq/observable/delay.rb,
lib/rx/linq/observable/pluck.rb,
lib/rx/linq/observable/publish.rb,
lib/rx/linq/observable/contains.rb,
lib/rx/linq/observable/debounce.rb,
lib/rx/linq/observable/interval.rb,
lib/rx/linq/observable/aggregate.rb,
lib/rx/linq/observable/multicast.rb,
lib/rx/linq/observable/timestamp.rb,
lib/rx/operators/synchronization.rb,
lib/rx/linq/observable/concat_all.rb,
lib/rx/linq/observable/concat_map.rb,
lib/rx/linq/observable/group_join.rb,
lib/rx/linq/observable/time_interval.rb,
lib/rx/operators/standard_query_operators.rb,
lib/rx/linq/observable/concat_map_observer.rb,
lib/rx/linq/observable/delay_with_selector.rb

Overview

Time based operations

Defined Under Namespace

Classes: AmbObserver, HashConfiguration

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.amb(*args) ⇒ Object

Propagates the observable sequence that reacts first.



426
427
428
# File 'lib/rx/operators/multiple.rb', line 426

def amb(*args)
  args.reduce(Observable.never) {|previous, current| previous.amb current }
end

.case(selector, sources, defaultSourceOrScheduler = Observable.empty) ⇒ Object Also known as: switchCase



3
4
5
6
7
8
9
10
11
12
# File 'lib/rx/linq/observable/case.rb', line 3

def case(selector, sources, defaultSourceOrScheduler = Observable.empty)
  defer {
    if Scheduler === defaultSourceOrScheduler
      defaultSourceOrScheduler = Observable.empty(defaultSourceOrScheduler)
    end

    result = sources[selector.call]
    result || defaultSourceOrScheduler
  }
end

.combine_latest(*args, &result_selector) ⇒ Object

Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.



494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/rx/operators/multiple.rb', line 494

def combine_latest(*args, &result_selector)
  AnonymousObservable.new do |observer|
    result_selector ||= lambda {|*inner_args| inner_args }

    n = args.length
    has_value = Array.new(n, false)
    has_value_all = false

    values = Array.new(n)
    is_done = Array.new(n, false)

    next_item = lambda do |i|
      has_value[i] = true
      if has_value_all || (has_value_all = has_value.all?)
        res = nil
        begin
          res = result_selector.call(*values)
        rescue => e
          observer.on_error e
          break
        end

        observer.on_next(res)
      elsif enumerable_select_with_index(is_done) {|_, j| j != i} .all?
        observer.on_completed
        break
      end
    end

    done = lambda do |i|
      is_done[i] = true
      observer.on_completed if is_done.all?
    end

    gate = Monitor.new
    subscriptions = Array.new(n) do |i|
      sas = SingleAssignmentSubscription.new

      sas_obs = Observer.configure do |o|
        o.on_next do |x|
          values[i] = x
          next_item.call i
        end

        o.on_error(&observer.method(:on_error))

        o.on_completed { done.call i }
      end

      sas.subscription = args[i].synchronize(gate).subscribe(sas_obs)

      sas
    end

    CompositeSubscription.new subscriptions
  end
end

.concat(*args) ⇒ Object

Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.



553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
# File 'lib/rx/operators/multiple.rb', line 553

def concat(*args)
  AnonymousObservable.new do |observer|
    disposed = false
    e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum
    subscription = SerialSubscription.new
    gate = AsyncLock.new

    cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this|
      gate.wait do
        current = nil
        has_next = false
        err = nil

        if disposed
          break
        else
          begin
            current = e.next
            has_next = true
          rescue StopIteration => _
            # Do nothing
          rescue => e
            err = e
          end
        end

        if err
          observer.on_error err
          break
        end

        unless has_next
          observer.on_completed
          break
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d

        new_obs = Observer.configure do |o|
          o.on_next(&observer.method(:on_next))
          o.on_error(&observer.method(:on_error))
          o.on_completed { this.call }
        end

        current.subscribe new_obs
      end
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true }}]
  end
end

.create(&subscribe) ⇒ Object

Creates an observable sequence from a specified subscribe method implementation.



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rx/operators/creation.rb', line 17

def create(&subscribe)
  AnonymousObservable.new do |observer|
    subscription = subscribe.call(observer)
    case subscription
    when Subscription
      subscription
    when Proc
      Subscription.create(&subscription)
    else
      Subscription.empty
    end
  end
end

.deferObject

Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rx/operators/creation.rb', line 32

def defer
  AnonymousObservable.new do |observer|
    result = nil
    e = nil
    begin
      result = yield
    rescue => err
      e = Observable.raise_error(err).subscribe(observer)
    end

    e || result.subscribe(observer)
  end
end

.empty(scheduler = ImmediateScheduler.instance) ⇒ Object

Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.



47
48
49
50
51
52
53
# File 'lib/rx/operators/creation.rb', line 47

def empty(scheduler = ImmediateScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule lambda {
      observer.on_completed
    }
  end
end

.for(sources, result_selector = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
# File 'lib/rx/linq/observable/for.rb', line 3

def for(sources, result_selector = nil)
  result_selector ||= lambda {|*args| args}
  enum = Enumerator.new {|y|
    sources.each {|v|
      y << result_selector.call(v)
    }
  }
  Observable.concat(enum)
end

.fork_join(*all_sources) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/rx/linq/observable/fork_join.rb', line 3

def fork_join(*all_sources)
  AnonymousObservable.new {|subscriber|
    count = all_sources.length
    if count == 0
      subscriber.on_completed
      Subscription.empty
    end
    group = CompositeSubscription.new
    finished = false
    has_results = Array.new(count)
    has_completed  = Array.new(count)
    results  = Array.new(count)

    count.times {|i|
      source = all_sources[i]
      group.push(
        source.subscribe(
          lambda {|value|
            if !finished
              has_results[i] = true
              results[i] = value
            end
          },
          lambda {|e|
            finished = true
            subscriber.on_error e
            group.dispose
          },
          lambda {
            if !finished
              if !has_results[i]
                  subscriber.on_completed
                  return
              end
              has_completed[i] = true
              count.times {|ix|
                if !has_completed[ix]
                  return
                end
              }
              finished = true
              subscriber.on_next results
              subscriber.on_completed
            end
          }
        )
      )
    }
    group
  }
end

.from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/rx/linq/observable/from.rb', line 3

def from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance)
  it = iterable.to_enum
  AnonymousObservable.new {|observer|
    i = 0
    scheduler.schedule_recursive lambda {|this|
      begin
        result = it.next
      rescue StopIteration => e
        observer.on_completed
        return
      rescue => e
        observer.on_error e
        return
      end

      if Proc === map_fn
        begin
          result = map_fn.call(result, i)
        rescue => e
          observer.on_error e
          return
        end
      end

      observer.on_next result
      i += 1
      this.call
    }
  }
end

.from_array(array, scheduler = CurrentThreadScheduler.instance) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/rx/operators/creation.rb', line 204

def from_array(array, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule_recursive_with_state 0, lambda {|i, this|
      if i < array.size
        observer.on_next array[i]
        this.call(i + 1)
      else
        observer.on_completed
      end
    }
  end
end

.generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Generates an observable sequence by running a state-driven loop producing the sequence’s elements.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rx/operators/creation.rb', line 56

def generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    state = initial_state
    first = true
    
    scheduler.schedule_recursive lambda{|this|
      has_result = false
      result = nil
      begin

        if first
          first = false
        else
          state = iterate.call(state)
        end

        has_result = condition.call(state)

        if has_result
          result = result_selector.call state
        end
      rescue => err
        observer.on_error err
        break
      end
      if has_result
        observer.on_next result
        this.call
      else
        observer.on_completed
      end
    }
  end
end

.if(condition, then_source, else_source_or_scheduler = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
# File 'lib/rx/linq/observable/if.rb', line 3

def if(condition, then_source, else_source_or_scheduler = nil)
  case else_source_or_scheduler
  when Scheduler
    scheduler = else_source_or_scheduler
    else_source = Observable.empty(scheduler)
  when Observable
    else_source = else_source_or_scheduler
  when nil
    else_source = Observable.empty
  end

  return condition.call ? then_source : else_source
end

.interval(period, scheduler = Rx::DefaultScheduler.instance) ⇒ Object



2
3
4
# File 'lib/rx/linq/observable/interval.rb', line 2

def self.interval(period, scheduler = Rx::DefaultScheduler.instance)
  observable_timer_time_span_and_period(period, period, scheduler)
end

.just(value, scheduler = ImmediateScheduler.instance) ⇒ Object Also known as: return

Returns an observable sequence that contains a single element.



99
100
101
102
103
104
105
106
# File 'lib/rx/operators/creation.rb', line 99

def just(value, scheduler = ImmediateScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule lambda {
      observer.on_next value
      observer.on_completed
    }
  end
end

.merge_all(*args) ⇒ Object Also known as: merge

Merges elements from all of the specified observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.



612
613
614
615
616
617
618
# File 'lib/rx/operators/multiple.rb', line 612

def merge_all(*args)
  scheduler = CurrentThreadScheduler.instance
  if args.size > 0 && Scheduler === args[0]
    scheduler = args.shift
  end
  Observable.from_array(args, scheduler).merge_all
end

.merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args) ⇒ Object

Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences, and using the specified scheduler for enumeration of and subscription to the sources.



607
608
609
# File 'lib/rx/operators/multiple.rb', line 607

def merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args)
  Observable.from_array(args, scheduler).merge_concurrent(max_concurrent)
end

.neverObject

Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).



92
93
94
95
96
# File 'lib/rx/operators/creation.rb', line 92

def never
  AnonymousObservable.new do |_|

  end
end

.of(*args) ⇒ Object



3
4
5
6
7
8
9
# File 'lib/rx/linq/observable/of.rb', line 3

def of(*args)
  scheduler = CurrentThreadScheduler.instance
  if args.size > 0 && Scheduler === args[0]
    scheduler = args.shift
  end
  of_array(args, scheduler)
end

.of_array(array, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Converts an array to an observable sequence, using an optional scheduler to enumerate the array.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/rx/operators/creation.rb', line 110

def of_array(array, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    count = 0
    scheduler.schedule_recursive lambda {|this|
      if count < array.length
        observer.on_next array[count]
        count += 1
        this.call
      else
        observer.on_completed
      end
    }
  end
end

.of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Converts an Enumerable to an observable sequence, using an optional scheduler to enumerate the array.



126
127
128
# File 'lib/rx/operators/creation.rb', line 126

def of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance)
  Observable.of_enumerator(enumerable.to_enum, scheduler)
end

.of_enumerator(enum, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Converts an Enumerator to an observable sequence, using an optional scheduler to enumerate the array.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/rx/operators/creation.rb', line 131

def of_enumerator(enum, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule_recursive lambda {|this|
      has_value = false
      value = nil

      begin
        value = enum.next
        has_value = true
      rescue StopIteration => _
        observer.on_completed
      rescue => e
        observer.on_error e
      end

      if has_value
        observer.on_next value
        this.call
      end
    }
  end
end

.on_error_resume_next(*args) ⇒ Object

Concatenates all of the specified observable sequences, even if the previous observable sequence terminated exceptionally.



622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
# File 'lib/rx/operators/multiple.rb', line 622

def on_error_resume_next(*args)
  AnonymousObservable.new do |observer|
    gate = AsyncLock.new
    disposed = false
    e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum
    subscription = SerialSubscription.new

    cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this|
      gate.wait do
        current = nil
        has_next = false
        err = nil
        if !disposed
          begin
            current = e.next
            has_next = true
          rescue StopIteration => _
            # Do nothing
          rescue => e
            err = e
          end
        else
          break
        end

        if err
          observer.on_error err
          break
        end

        unless has_next
          observer.on_completed
          break
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d

        new_obs = Observer.configure do |o|
          o.on_next(&observer.method(:on_next))
          o.on_error {|_| this.call }
          o.on_completed { this.call }
        end

        d.subscription = current.subscribe new_obs
      end
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }]
  end
end

.pairs(obj, scheduler = CurrentThreadScheduler.instance) ⇒ Object



3
4
5
# File 'lib/rx/linq/observable/pairs.rb', line 3

def pairs(obj, scheduler = CurrentThreadScheduler.instance)
  of_enumerable(obj, scheduler)
end

.raise_error(error, scheduler = ImmediateScheduler.instance) ⇒ Object

Returns an observable sequence that terminates with an exception.



155
156
157
158
159
160
161
# File 'lib/rx/operators/creation.rb', line 155

def raise_error(error, scheduler = ImmediateScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule lambda {
      observer.on_error error
    }
  end
end

.range(start, count, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Generates an observable sequence of integral numbers within a specified range.



164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/rx/operators/creation.rb', line 164

def range(start, count, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule_recursive_with_state 0, lambda {|i, this|
      if i < count
        observer.on_next (start + i)
        this.call(i + 1)
      else
        observer.on_completed
      end
    }
  end
end

.repeat(value, count, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Generates an observable sequence that repeats the given element the specified number of times.



183
184
185
# File 'lib/rx/operators/creation.rb', line 183

def repeat(value, count, scheduler = CurrentThreadScheduler.instance)
  Observable.just(value, scheduler).repeat(count)
end

.repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Generates an observable sequence that repeats the given element infinitely.



178
179
180
# File 'lib/rx/operators/creation.rb', line 178

def repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance)
  Observable.just(value, scheduler).repeat_infinitely
end

.rescue_error(*args) ⇒ Object

Continues an observable sequence that is terminated by an exception with the next observable sequence.



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'lib/rx/operators/multiple.rb', line 431

def rescue_error(*args)
  AnonymousObservable.new do |observer|
    gate = AsyncLock.new
    disposed = false
    e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum
    subscription = SerialSubscription.new
    last_error = nil

    cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this|
      gate.wait do
        current = nil
        has_next = false
        error = nil

        if disposed
          break
        else
          begin
            current = e.next
            has_next = true
          rescue StopIteration => _
            # Do nothing
          rescue => e
            error = e
          end
        end

        if error
          observer.on_error error
          break
        end

        unless has_next
          if last_error
            observer.on_error last_error
          else
            observer.on_completed
          end
          break
        end

        new_obs = Observer.configure do |o|
          o.on_next(&observer.method(:on_next))

          o.on_error do |err|
            last_error = err
            this.call
          end

          o.on_completed(&observer.method(:on_completed))
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d
        d.subscription = current.subscribe new_obs
      end
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }]
  end
end

.start(func, context, scheduler = DefaultScheduler.instance) ⇒ Object



3
4
5
# File 'lib/rx/linq/observable/start.rb', line 3

def start(func, context, scheduler = DefaultScheduler.instance)
  Observable.to_async(func, context, scheduler).call
end

.timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/rx/linq/observable/timer.rb', line 3

def timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance)
  case period_or_scheduler
  when Numeric
    period = period_or_scheduler
  when Scheduler
    scheduler = period_or_scheduler
  end

  if Time === due_time
    if period.nil?
      observable_timer_date(due_time, scheduler)
    else
      observable_timer_date_and_period(due_time, period, scheduler)
    end
  else
    if period.nil?
      observable_timer_time_span(due_time, scheduler)
    else
      observable_timer_time_span_and_period(due_time, period, scheduler)
    end
  end
end

.to_async(func, context = nil, scheduler = DefaultScheduler.instance) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rx/linq/observable/to_async.rb', line 3

def to_async(func, context = nil, scheduler = DefaultScheduler.instance)
  lambda() {|*args|
    subject = AsyncSubject.new

    scheduler.schedule lambda {
      begin
        if context
          result = proc_bind(func, context).call(*args)
        else
          result = func.call(*args)
        end
      rescue => e
        subject.on_error e
        return
      end
      subject.on_next result
      subject.on_completed
    }
    return subject.as_observable
  }
end

.using(resource_factory, observable_factory) ⇒ Object

Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence’s lifetime.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/rx/operators/creation.rb', line 188

def using(resource_factory, observable_factory)
  AnonymousObservable.new do |observer|
    source = nil
    subscription = Subscription.empty
    begin
      resource = resource_factory.call
      subscription = resource unless resource.nil?
      source = observable_factory.call resource
    rescue => e
      next CompositeSubscription.new [self.raise_error(e).subscribe(observer), subscription]
    end

    CompositeSubscription.new [source.subscribe(observer), subscription]
  end
end

.when(*plans) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rx/linq/observable/when.rb', line 3

def when(*plans)
  AnonymousObservable.new do |observer|
    active_plans = []
    external_subscriptions = {}
    out_observer = Observer.configure {|o|
      o.on_next(&observer.method(:on_next))
      o.on_error {|err|
        external_subscriptions.each {|_, v|
          v.on_error err
        }
      }
      o.on_completed(&observer.method(:on_completed))
    }
    begin
      plans.each {|x|
        active_plans.push x.activate(external_subscriptions, out_observer, lambda {|active_plan|
          active_plans.delete(active_plan)
          active_plans.length == 0 && observer.on_completed 
        })
      }
    rescue => e
      Observable.raise_error(e).subscribe(observer)
    end
    group = CompositeSubscription.new
    external_subscriptions.each {|_, join_observer|
      join_observer.subscribe
      group.push join_observer
    }

    group
  end
end

.while(condition, source) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rx/linq/observable/while.rb', line 3

def while(condition, source)
  enum = Enumerator.new {|y|
    while condition.call
      y << source
    end
  }
  scheduler = ImmediateScheduler.instance

  is_disposed = false
  subscription = SerialSubscription.new

  AnonymousObservable.new do |observer|
    cancelable = scheduler.schedule_recursive lambda {|this|
      return if is_disposed

      begin
        current_value = enum.next
      rescue StopIteration => e
        observer.on_completed
        return
      rescue => e
        observer.on_error e
        return
      end

      d = SingleAssignmentSubscription.new
      subscription.subscription = d
      d.subscription = current_value.subscribe(
        observer.method(:on_next),
        observer.method(:on_error),
        lambda { this.call }
      )
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { is_disposed = true }]
  end
end

.zip(*args, &result_selector) ⇒ Object

Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.



675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
# File 'lib/rx/operators/multiple.rb', line 675

def zip(*args, &result_selector)
  AnonymousObservable.new do |observer|
    result_selector ||= lambda {|*inner_args| inner_args }
    n = args.length

    queues = Array.new(n) {|i| Array.new }
    is_done = Array.new(n, false)

    next_action = lambda do |i|
      if queues.all? {|q| q.length > 0 }
        res = queues.map {|q| q.shift }
        observer.on_next(result_selector.call(*res))
      elsif enumerable_select_with_index(is_done) {|x, j| j != i } .all?
        observer.on_completed
      end
    end

    done = lambda do |i|
      is_done[i] = true
      observer.on_completed if is_done.all?
    end

    gate = Monitor.new

    subscriptions = Array.new(n) do |i|
      sas = SingleAssignmentSubscription.new

      sas_obs = Observer.configure do |o|
        o.on_next do |x|
          queues[i].push(x)
          next_action.call i
        end

        o.on_error(&observer.method(:on_error))

        o.on_completed { done.call i }
      end

      sas.subscription = args[i].synchronize(gate).subscribe(sas_obs)
      sas
    end

    subscriptions.push(Subscription.create { queues.each {|q| q = [] }})

    CompositeSubscription.new subscriptions
  end
end

Instance Method Details

#_subscribe(observer) ⇒ Subscription

Subscribes the given observer to the observable sequence.

Parameters:

Returns:



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/rx/core/observable.rb', line 37

def (observer)

  auto_detach_observer = AutoDetachObserver.new observer

  if CurrentThreadScheduler.schedule_required?
    CurrentThreadScheduler.instance.schedule_with_state auto_detach_observer, method(:schedule_subscribe)
  else
    begin
      auto_detach_observer.subscription = subscribe_core auto_detach_observer
    rescue => e
      raise e unless auto_detach_observer.fail e
    end
  end

  auto_detach_observer
end

#add_ref(r) ⇒ Object



3
4
5
6
7
# File 'lib/rx/internal/util.rb', line 3

def add_ref(r)
  AnonymousObservable.new do |observer|
    CompositeSubscription.new [r.subscription, self.subscribe(observer)]
  end
end

#all?(&block) ⇒ Rx::Observable

Determines whether all elements of an observable sequence satisfy a condition if block given, else if all are true

Parameters:

  • block (Proc)

Returns:



64
65
66
67
68
69
# File 'lib/rx/operators/aggregates.rb', line 64

def all?(&block)
  block ||= lambda { |_| true }
  select {|v| !(block.call v)}.
  any?.
  map {|b| !b }
end

#amb(second) ⇒ Object

Propagates the observable sequence that reacts first.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rx/operators/multiple.rb', line 26

def amb(second)
  AnonymousObservable.new do |observer|
    left_subscription = SingleAssignmentSubscription.new
    right_subscription = SingleAssignmentSubscription.new
    choice = :neither

    gate = Monitor.new

    left = AmbObserver.new
    right = AmbObserver.new

    handle_left = lambda do |&action|
      if choice == :neither
        choice = :left
        right_subscription.unsubscribe
        left.observer = observer
      end

      action.call if choice == :left
    end

    handle_right = lambda do |&action|
      if choice == :neither
        choice = :right
        left_subscription.unsubscribe
        right.observer = observer
      end

      action.call if choice == :right
    end

    left_obs = Observer.configure do |o|
      o.on_next {|x| handle_left.call { observer.on_next x } }
      o.on_error {|err| handle_left.call { observer.on_error err } }
      o.on_completed { handle_left.call { observer.on_completed } }
    end

    right_obs = Observer.configure do |o|
      o.on_next {|x| handle_right.call { observer.on_next x } }
      o.on_error {|err| handle_right.call { observer.on_error err } }
      o.on_completed { handle_right.call { observer.on_completed } }
    end        

    left.observer = Observer.allow_reentrancy(left_obs, gate)
    right.observer = Observer.allow_reentrancy(right_obs, gate)

    left_subscription.subscription = self.subscribe left
    right_subscription.subscription = second.subscribe right

    CompositeSubscription.new [left_subscription, right_subscription]
  end
end

#and(right) ⇒ Object



3
4
5
# File 'lib/rx/linq/observable/and.rb', line 3

def and(right)
  Pattern.new([self, right]);
end

#any?(&block) ⇒ Rx::Observable

Determines whether any element of an observable sequence satisfies a condition if a block is given else if there are any items in the observable sequence.

Returns:



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/rx/operators/aggregates.rb', line 84

def any?(&block)
  return map(&block).any? if block_given?
  AnonymousObservable.new do |observer|
    new_obs = Observer.configure do |o|
      o.on_next do |_|
        observer.on_next true
        observer.on_completed
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next false
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end

#as_observableObject

Hides the identity of an observable sequence.



16
17
18
# File 'lib/rx/operators/single.rb', line 16

def as_observable
  AnonymousObservable.new {|observer| subscribe(observer) }
end

#average(&block) ⇒ Rx::Observable

Computes the average of an observable sequence of values that are optionally obtained by invoking a transform function on each element of the input sequence if a block is given

Parameters:

  • block (Object)

Returns:



109
110
111
112
113
114
115
116
117
# File 'lib/rx/operators/aggregates.rb', line 109

def average(&block)
  return map(&block).average if block_given?
  scan({:sum => 0, :count => 0}) {|prev, current| {:sum => prev[:sum] + current, :count => prev[:count] + 1 }}.
  final.
  map {|x|
    raise 'Sequence contains no elements' if x[:count] == 0
    x[:sum] / x[:count]
  }
end

#buffer_with_count(count, skip = count) ⇒ Object

Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.



21
22
23
24
25
# File 'lib/rx/operators/single.rb', line 21

def buffer_with_count(count, skip = count)
  raise ArgumentError.new 'Count must be greater than zero' if count <= 0
  raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0
  window_with_count(count, skip).flat_map(&:to_a).find_all {|x| x.length > 0 }
end

#buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) ⇒ Object

Projects each element of an observable sequence into consecutive non-overlapping buffers which are produced based on timing information.



22
23
24
25
26
# File 'lib/rx/operators/time.rb', line 22

def buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance)
  raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0
  raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0
  window_with_time(time_span, time_shift, scheduler).flat_map(&:to_a)
end

#combine_latest(other, &result_selector) ⇒ Object

Merges two observable sequences into one observable sequence by using the selector function whenever one of the observable sequences produces an element.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/rx/operators/multiple.rb', line 117

def combine_latest(other, &result_selector)
  AnonymousObservable.new do |observer|
    has_left = false
    has_right = false

    left = nil
    right = nil

    left_done = false
    right_done = false

    left_subscription = SingleAssignmentSubscription.new
    right_subscription = SingleAssignmentSubscription.new

    gate = Monitor.new

    left_obs = Observer.configure do |o|
      o.on_next do |l|
        has_left = true
        left = l

        if has_right
          res = nil
          begin
            res = result_selector.call left, right
          rescue => e
            observer.on_error e
            break
          end
          observer.on_next res
        end

        observer.on_completed if right_done
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        left_done = true
        observer.on_completed if right_done
      end
    end

    right_obs = Observer.configure do |o|
      o.on_next do |r|
        has_right = true
        right = r

        if has_left
          res = nil
          begin
            res = result_selector.call left, right
          rescue => e
            observer.on_error e
            break
          end
          observer.on_next res
        end

        observer.on_completed if left_done
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        right_done = true
        observer.on_completed if left_done
      end
    end

    left_subscription.subscription = synchronize(gate).subscribe(left_obs)
    right_subscription.subscription = other.synchronize(gate).subscribe(right_obs)

    CompositeSubscription.new [left_subscription, right_subscription]
  end
end

#concat(*other) ⇒ Object

Concatenates the second observable sequence to the first observable sequence upon successful termination of the first.



195
196
197
# File 'lib/rx/operators/multiple.rb', line 195

def concat(*other)
  Observable.concat([self, *other].to_enum)
end

#concat_allObject



3
4
5
# File 'lib/rx/linq/observable/concat_all.rb', line 3

def concat_all
  merge_concurrent(1)
end

#concat_map(selector, result_selector = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/rx/linq/observable/concat_map.rb', line 3

def concat_map(selector, result_selector = nil)
  if Proc === result_selector
    return concat_map(lambda {|x, i|
      selector_result = selector.call(x, i)
      if selector_result.respond_to?(:each)
        selector_result = Observable.from(selector_result)
      end
      selector_result.map_with_index {|y, i2|
        result_selector.call(x, y, i, i2)
      }
    })
  end

  if Proc === selector
    _concat_map(selector)
  else
    _concat_map(lambda {|*_| selector })
  end
end

#concat_map_observer(on_next, on_error, on_completed) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rx/linq/observable/concat_map_observer.rb', line 3

def concat_map_observer(on_next, on_error, on_completed)
  AnonymousObservable.new do |observer|
    index = 0

    subscribe(
      lambda {|x|
        begin
          result = on_next.call(x, index)
          index += 1
        rescue => e
          observer.on_error e
          return
        end
        observer.on_next result
      },
      lambda {|err|
        begin
          result = on_error.call(err)
        rescue => e
          observer.on_error e
          return
        end

        observer.on_next result
        observer.on_completed
      },
      lambda {
        begin
          result = on_completed.call
        rescue => e
          observer.on_error e
          return
        end

        observer.on_next result
        observer.on_completed
      })
  end.concat_all
end

#contains(search_element, from_index = 0) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/rx/linq/observable/contains.rb', line 3

def contains(search_element, from_index = 0)
  AnonymousObservable.new do |observer|
    i = 0
    n = from_index
    if n < 0
      observer.on_next false
      observer.on_completed
      return Subscription.empty
    end

    subscribe(
      lambda {|x|
        if i.tap { i += 1 } >= n && x == search_element
          observer.on_next true
          observer.on_completed
        end
      },
      observer.method(:on_error),
      lambda {
        observer.on_next false
        observer.on_completed
      })
  end
end

#contains?(item) ⇒ Rx::Observable

Determines whether an observable sequence contains a specified element. sequence contains an element that has the specified value.

Parameters:

  • item (Object)

    The value to locate in the source sequence.

Returns:

  • (Rx::Observable)

    An observable sequence containing a single element determining whether the source



123
124
125
# File 'lib/rx/operators/aggregates.rb', line 123

def contains?(item)
  select {|x| x.eql? item}.any?
end

#count(&block) ⇒ Object

Returns an observable sequence containing a number that represents how many elements in the specified observable sequence satisfy a condition if the block is given, else the number of items in the observable sequence



130
131
132
133
# File 'lib/rx/operators/aggregates.rb', line 130

def count(&block)
  return select(&block).count if block_given?
  reduce(0) {|c, _| c + 1 }
end

#debounce(due_time, scheduler = DefaultScheduler.instance) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rx/linq/observable/debounce.rb', line 3

def debounce(due_time, scheduler = DefaultScheduler.instance)
  AnonymousObservable.new do |observer|
    cancelable = SerialSubscription.new
    hasvalue = false
    value = nil
    id = 0

    subscription = subscribe(
      lambda {|x|
        hasvalue = true
        value = x
        id += 1
        current_id = id
        d = SingleAssignmentSubscription.new
        cancelable.subscription = d
        d.subscription = scheduler.schedule_relative(due_time, lambda {
          observer.on_next value if hasvalue && id == current_id
          hasvalue = false
        })
      },
      lambda {|e|
        cancelable.dispose
        observer.on_error e
        hasvalue = false
        id += 1
      },
      lambda {
        cancelable.dispose
        observer.on_next value if hasvalue
        observer.on_completed
        hasvalue = false
        id += 1
      })

    CompositeSubscription.new [subscription, cancelable]
  end
end

#default_if_empty(default_value = nil) ⇒ Object

Returns the elements of the specified sequence or the type parameter’s default value in a singleton sequence if the sequence is empty.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rx/operators/standard_query_operators.rb', line 16

def default_if_empty(default_value = nil)
  AnonymousObservable.new do |observer|
    found = false
    new_observer = Observer.configure do |o|
      
      o.on_next do |x|
        found = true
        observer.on_next x
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do 
        observer.on_next(default_value) unless found
        observer.on_completed
      end
    end

    subscribe(new_observer)
  end
end

#delay(due_time, scheduler = DefaultScheduler.instance) ⇒ Object



3
4
5
6
7
8
9
# File 'lib/rx/linq/observable/delay.rb', line 3

def delay(due_time, scheduler = DefaultScheduler.instance)
  if Time === due_time
    delay_date(due_time, scheduler)
  else
    delay_time_span(due_time, scheduler)
  end
end

#delay_with_selector(subscription_delay, delay_duration_selector = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/rx/linq/observable/delay_with_selector.rb', line 3

def delay_with_selector(subscription_delay, delay_duration_selector = nil)
  if Proc === subscription_delay
    selector = subscription_delay
  else
    sub_delay = subscription_delay
    selector = delay_duration_selector
  end

  AnonymousObservable.new do |observer|
    delays = CompositeSubscription.new
    at_end = false
    done = lambda {
      if at_end && delays.length == 0
        observer.on_completed
      end
    }
    subscription = SerialSubscription.new
    start = lambda {|*_|
      subscription.subscription = subscribe(
        lambda {|x|
          begin
            delay = selector.call(x)
          rescue => error
            observer.on_error error
            return
          end
          d = SingleAssignmentSubscription.new
          delays.push(d)
          d.subscription = delay.subscribe(
            lambda {|_|
              observer.on_next x
              delays.delete(d)
              done.call
            },
            observer.method(:on_error),
            lambda {
              observer.on_next x
              delays.delete(d)
              done.call
            })
        },
        observer.method(:on_error),
        lambda {
          at_end = true
          subscription.dispose
          done.call
        })
    }
    
    if !sub_delay
      start.call
    else
      subscription.subscription = sub_delay.subscribe(
        start,
        observer.method(:on_error),
        start)
    end
    CompositeSubscription.new [subscription, delays]
  end
end

#dematerializeObject

Dematerializes the explicit notification values of an observable sequence as implicit notifications.



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rx/operators/single.rb', line 28

def dematerialize
  AnonymousObservable.new do |observer|

    new_obs = Rx::Observer.configure do |o|
      o.on_next {|x| x.accept observer }
      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end

#distinct(&key_selector) ⇒ Object

Returns an observable sequence that contains only distinct elements according to the optional key_selector.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rx/operators/standard_query_operators.rb', line 39

def distinct(&key_selector)
  key_selector ||= lambda {|x| x}

  AnonymousObservable.new do |observer|

    h = Hash.new

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        key = nil
        has_added = false

        begin
          key = key_selector.call x
          key_s = key.to_s
          unless h.key? key_s
            has_added = true
            h[key_s] = true
          end
        rescue => e
          observer.on_error e
          next
        end

        observer.on_next x if has_added
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end

#distinct_until_changed(&key_selector) ⇒ Object

Returns an observable sequence that contains only distinct contiguous elements according to the optional key_selector.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/rx/operators/single.rb', line 42

def distinct_until_changed(&key_selector)
  key_selector ||= lambda {|x| x}
  AnonymousObservable.new do |observer|
    current_key = nil
    has_current = nil

    new_obs = Rx::Observer.configure do |o|
      o.on_next do |value|
        key = nil
        begin
          key = key_selector.call value
        rescue => err
          observer.on_error err
          next
        end

        if !current_key || key != current_key
          has_current = true
          current_key = key
          observer.on_next value
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end

#do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rx/linq/observable/do.rb', line 3

def do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil)
  if block_given?
    on_next_func = Proc.new
  elsif Proc === observer_or_on_next
    on_next_func = observer_or_on_next
  else
    on_next_func = observer_or_on_next.method(:on_next)
    on_error_func = observer_or_on_next.method(:on_error)
    on_completed_func = observer_or_on_next.method(:on_completed)
  end
  AnonymousObservable.new do |observer|
    subscribe(
      lambda {|x|
        begin
          on_next_func.call x
        rescue => e
          observer.on_error e
        end
        observer.on_next x
      },
      lambda {|err|
        begin
          on_error_func && on_error_func.call(x)
        rescue => e
          observer.on_error e
        end
        observer.on_error err
      },
      lambda {
        begin
          on_completed_func && on_completed_func.call
        rescue => e
          observer.on_error e
        end
        observer.on_completed
      })
  end
end

#element_at(index) ⇒ Rx::Observable

Returns the element at a specified index in a sequence. source sequence.

Parameters:

  • index (Numeric)

    The zero-based index of the element to retrieve.

Returns:

  • (Rx::Observable)

    An observable sequence that produces the element at the specified position in the



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/rx/operators/aggregates.rb', line 139

def element_at(index)
  raise ArgumentError.new 'index cannot be less than zero' if index < 0
  AnonymousObservable.new do |observer|
    i = index
    new_obs = Observer.configure do |o|
      o.on_next do |value|
        if i == 0
          observer.on_next value
          observer.on_completed
        end

        i -= 1
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed { raise 'Sequence contains no elements' }
    end

    subscribe new_obs
  end
end

#element_at_or_default(index, default_value = nil) ⇒ Object

Returns the element at a specified index in a sequence or a default value if the index is out of range.

Parameters:

  • index (Numeric)

    The zero-based index of the element to retrieve.

  • default_value (Object) (defaults to: nil)

    The default value to use if the index is out of range.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/rx/operators/aggregates.rb', line 164

def element_at_or_default(index, default_value = nil)
  raise ArgumentError.new 'index cannot be less than zero' if index < 0
  AnonymousObservable.new do |observer|
    i = index
    new_obs = Observer.configure do |o|
      o.on_next do |value|
        if i == 0
          observer.on_next value
          observer.on_completed
        end

        i -= 1
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next default_value
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end

#empty?Rx::Observable

Determines whether an observable sequence is empty. sequence is empty.

Returns:

  • (Rx::Observable)

    An observable sequence containing a single element determining whether the source



242
243
244
# File 'lib/rx/operators/aggregates.rb', line 242

def empty?
  any?.map {|b| !b }
end

#ensuresObject

Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.



117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/rx/operators/single.rb', line 117

def ensures
  AnonymousObservable.new do |observer|
    subscription = subscribe observer
    Subscription.create do
      begin
        subscription.unsubscribe
      ensure
        yield
      end
    end
  end
end

#enumerator_repeat_infinitely(value) ⇒ Object



390
391
392
393
394
395
396
# File 'lib/rx/operators/single.rb', line 390

def enumerator_repeat_infinitely(value)
  Enumerator.new do |y|
    while true
      y << value
    end
  end
end

#enumerator_repeat_times(num, value) ⇒ Object



382
383
384
385
386
387
388
# File 'lib/rx/operators/single.rb', line 382

def enumerator_repeat_times(num, value)
  Enumerator.new do |y|
    num.times do |i|
      y << value
    end
  end
end

#finalRx::Observable

Internal method to get the final value

Returns:



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rx/operators/aggregates.rb', line 16

def final
  AnonymousObservable.new do |observer|
    value = nil
    has_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        value = x
        has_value = true
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        if has_value
          observer.on_next value
          observer.on_completed
        else
          observer.on_error(RuntimeError.new 'Sequence contains no elements')
        end
      end
    end

    subscribe new_obs
  end
end

#first(&block) ⇒ Rx::Observable

Returns the first element of an observable sequence that satisfies the condition in the predicate if a block is given, else the first item in the observable sequence. condition in the predicate if a block is given, else the first element.

Parameters:

  • block (Proc)

    Optional predicate function to evaluate for elements in the source sequence.

Returns:

  • (Rx::Observable)

    Sequence containing the first element in the observable sequence that satisfies the



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/rx/operators/aggregates.rb', line 195

def first(&block)
  return select(&block).first if block_given?
  AnonymousObservable.new do |observer|
    new_obs = Observer.configure do |o|
      o.on_next do |x|
        observer.on_next x
        observer.on_completed
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed { raise 'Sequence contains no elements' }
    end

    subscribe new_obs
  end
end

#first_or_default(default_value = nil, &block) ⇒ Rx::Observable

Returns the first element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. condition in the predicate if given, or a default value if no such element exists.

Parameters:

  • default_value (Object) (defaults to: nil)

    The default value to use if the sequence is empty.

  • block (Proc)

    An optional predicate function to evaluate for elements in the source sequence.

Returns:

  • (Rx::Observable)

    Sequence containing the first element in the observable sequence that satisfies the



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/rx/operators/aggregates.rb', line 218

def first_or_default(default_value = nil, &block)
  return select(&block).first_or_default(default_value) if block_given?
  AnonymousObservable.new do |observer|
    new_obs = Observer.configure do |o|
      o.on_next do |x|
        observer.on_next x
        observer.on_completed
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next default_value
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end

#flat_map(&block) ⇒ Object

Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.



108
109
110
# File 'lib/rx/operators/standard_query_operators.rb', line 108

def flat_map(&block)
  map(&block).merge_all
end

#flat_map_with_index(&block) ⇒ Object

Projects each element of an observable sequence to an observable sequence by incorporating the element’s index and merges the resulting observable sequences into one observable sequence.



113
114
115
# File 'lib/rx/operators/standard_query_operators.rb', line 113

def flat_map_with_index(&block)
  map_with_index(&block).merge_all
end

#group_join(right, left_duration_selector, right_duration_selector, result_selector) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/rx/linq/observable/group_join.rb', line 3

def group_join(right, left_duration_selector, right_duration_selector, result_selector)
  AnonymousObservable.new do |observer|
    group = CompositeSubscription.new
    r = RefCountSubscription.new(group)
    left_map = {}
    right_map = {}
    left_id = 0
    right_id = 0

    left_obs = Observer.configure do |o|
      o.on_next {|value|
        s = Subject.new
        id = left_id
        left_id += 1
        left_map[id] = s

        begin
          result = result_selector.call(value, s.add_ref(r))
        rescue => err
          left_map.values.each {|v| v.on_error(err) }
          observer.on_error(err)
          next
        end
        observer.on_next(result)

        right_map.values.each {|v| s.on_next(v) }

        md = SingleAssignmentSubscription.new
        group.push md

        expire = lambda {
          if left_map.delete(id)
            s.on_completed
          end
          group.delete(md)
        }

        begin
          duration = left_duration_selector.call(value)
        rescue => err
          left_map.values.each {|v| v.on_error(err) }
          observer.on_error(err)
          next
        end

        md.subscription = duration.take(1).subscribe(
          lambda {|_| },
          lambda {|e|
            left_map.values.each {|v| v.on_error(e) }
            observer.on_error(e)
          },
          expire)
      }

      o.on_error {|e|
        left_map.values.each {|v| v.on_error(e) }
        observer.on_error(e)
      }

      o.on_completed(&observer.method(:on_completed))
    end
    group.push self.subscribe(left_obs)

    right_obs = Observer.configure do |o|
      o.on_next {|value|
        id = right_id
        right_id += 1
        right_map[id] = value

        md = SingleAssignmentSubscription.new
        group.push md

        expire = lambda {
          right_map.delete(id)
          group.delete(md)
        }

        begin
          duration = right_duration_selector.call(value)
        rescue => err
          right_map.values.each {|v| v.on_error(err) }
          observer.on_error(err)
          next
        end

        md.subscription = duration.take(1).subscribe(
          lambda {|_| },
          lambda {|e|
            left_map.values.each {|v| v.on_error(e) }
            observer.on_error(e)
          },
          expire)
      }

      o.on_error {|e|
        left_map.values.each {|v| v.on_error(e) }
        observer.on_error(e)
      }
    end
    group.push right.subscribe(right_obs)

    r
  end
end

#ignore_elementsObject

Ignores all elements in an observable sequence leaving only the termination messages.



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/rx/operators/single.rb', line 131

def ignore_elements
  AnonymousObservable.new do |observer|
    new_obs = Rx::Observer.configure do |o|
      o.on_next {|_| }
      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end

#last(&block) ⇒ Rx::Observable

Returns the last element of an observable sequence that satisfies the condition in the predicate if the block is given, else the last element in the observable sequence. condition in the predicate if given, or the last element in the observable sequence.

Parameters:

  • block (Proc)

    An predicate function to evaluate for elements in the source sequence.

Returns:

  • (Rx::Observable)

    Sequence containing the last element in the observable sequence that satisfies the



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/rx/operators/aggregates.rb', line 251

def last(&block)
  return select(&block).last if block_given?
  AnonymousObservable.new do |observer|

    value = nil
    seen_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |v|
        value = v
        seen_value = true
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        if seen_value
          observer.on_next value
          observer.on_completed
        else
          observer.on_error(RuntimeError.new 'Sequence contains no elements' )
        end
      end
    end

    subscribe new_obs
  end
end

#last_or_default(default_value = nil, &block) ⇒ Rx::Observable

Returns the last element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. condition in the predicate if given, or a default value if no such element exists.

Parameters:

  • default_value (Object) (defaults to: nil)

    The default value to use if the sequence is empty.

  • block (Proc)

    An predicate function to evaluate for elements in the source sequence.

Returns:

  • (Rx::Observable)

    Sequence containing the last element in the observable sequence that satisfies the



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/rx/operators/aggregates.rb', line 286

def last_or_default(default_value = nil, &block)
  return select(&block).last_or_default(default_value) if block_given?
  AnonymousObservable.new do |observer|

    value = nil
    seen_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |v|
        value = v
        seen_value = true
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next (seen_value ? value : default_value)
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end

#latestObject

Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. Each time a new inner observable sequence is received, unsubscribe from the previous inner observable sequence.



347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/rx/operators/multiple.rb', line 347

def latest
  AnonymousObservable.new do |observer|
    gate = Monitor.new
    inner_subscription = SerialSubscription.new
    stopped = false
    latest_num = 0
    has_latest = false

    source_obs = Observer.configure do |o|
      o.on_next do |inner_source|
        id = 0

        gate.synchronize do
          latest_num += 1
          id = latest_num
        end

        d = SingleAssignmentSubscription.new

        inner_obs = Observer.configure do |io|
          io.on_next {|x| gate.synchronize { observer.on_next x if latest_num == id } }
          io.on_error do |err| 
            gate.synchronize do 
              has_latest = false
              observer.on_error err if latest_num == id
            end
          end
        end

        d.subscription = inner_source.subscribe inner_obs
      end

      o.on_error {|err| gate.synchronize { observer.on_error err } }

      o.on_completed do
        gate.synchronize do
          stopped = true
          observer.on_completed unless has_latest
        end
      end
    end

    subscription = subscribe source_obs

    CompositeSubscription.new [subscription, inner_subscription]
  end
end

#map(&block) ⇒ Object

Projects each element of an observable sequence into a new form.



103
104
105
# File 'lib/rx/operators/standard_query_operators.rb', line 103

def map(&block)
  map_with_index {|x, _| block.call x }
end

#map_with_index(&block) ⇒ Object

Projects each element of an observable sequence into a new form by incorporating the element’s index.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rx/operators/standard_query_operators.rb', line 76

def map_with_index(&block)
  AnonymousObservable.new do |observer|
    new_observer = Observer.configure do |o|
      i = 0

      o.on_next do |x|
        result = nil
        begin
          result = block.call(x, i)
          i += 1
        rescue => e
          observer.on_error e
          next
        end

        observer.on_next result
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end

#materializeObject

Materializes the implicit notifications of an observable sequence as explicit notification values.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/rx/operators/single.rb', line 144

def materialize
  AnonymousObservable.new do |observer|
    new_obs = Rx::Observer.configure do |o|

      o.on_next {|x| observer.on_next(Notification.create_on_next x) }

      o.on_error do |err|
        observer.on_next(Notification.create_on_next err)
        observer.on_completed
      end

      o.on_completed do
        observer.on_next(Notification.create_on_completed)
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end

#max(&block) ⇒ Rx::Observable

Returns the maximum element in an observable sequence.

Parameters:

  • block (Proc)

    An optional selector function to produce an element.

Returns:



314
315
316
317
# File 'lib/rx/operators/aggregates.rb', line 314

def max(&block)
  return map(&block).max if block_given?
  max_by {x| x} .map {|x| x[0] }
end

#max_by(&block) ⇒ Rx::Observable

Returns the elements in an observable sequence with the maximum key value. key value.

Parameters:

  • block (Proc)

    Key selector function.

Returns:

  • (Rx::Observable)

    An observable sequence containing a list of zero or more elements that have a maximum



323
324
325
# File 'lib/rx/operators/aggregates.rb', line 323

def max_by(&block)
  extrema_by(&block)
end

#merge(other, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Merges elements from two observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.



200
201
202
# File 'lib/rx/operators/multiple.rb', line 200

def merge(other, scheduler = CurrentThreadScheduler.instance)
  Observable.merge_all(scheduler, *[self, other])
end

#merge_allObject

Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/rx/operators/multiple.rb', line 265

def merge_all
  AnonymousObservable.new do |observer|
    gate = Monitor.new
    stopped = false
    m = SingleAssignmentSubscription.new
    group = CompositeSubscription.new [m]

    new_obs = Observer.configure do |o|
      o.on_next do |inner_source|
        inner_subscription = SingleAssignmentSubscription.new
        group << inner_subscription

        inner_obs = Observer.configure do |io|
          io.on_next {|x| gate.synchronize { observer.on_next x } }
          
          io.on_error {|err| gate.synchronize { observer.on_error err } }
          
          io.on_completed do
            group.delete inner_subscription
            gate.synchronize { observer.on_completed } if stopped && group.length == 1
          end
        end

        inner_subscription.subscription = inner_source.subscribe inner_obs
      end

      o.on_error {|err| gate.synchronize { observer.on_error err } }

      o.on_completed do
        stopped = true
        gate.synchronize { observer.on_completed } if group.length == 1
      end
    end

    subscribe new_obs
  end
end

#merge_concurrent(max_concurrent = 1) ⇒ Object

Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/rx/operators/multiple.rb', line 205

def merge_concurrent(max_concurrent = 1)
  AnonymousObservable.new do |observer|
    gate = Monitor.new
    q = []
    stopped = false
    group = CompositeSubscription.new
    active = 0

    subscriber = nil
    subscriber = lambda do |xs|
      subscription = SingleAssignmentSubscription.new
      group << subscription

      new_obs = Observer.configure do |o|
        o.on_next {|x| gate.synchronize { observer.on_next x } }
        
        o.on_error {|err| gate.synchronize { observer.on_error err } }
        
        o.on_completed do 
          group.delete subscription
          gate.synchronize do
            if q.length > 0
              s = q.shift
              subscriber.call s
            else
              active -= 1
              observer.on_completed if stopped && active == 0
            end
          end
        end
      end

      xs.subscribe new_obs
    end

    inner_obs = Observer.configure do |o|
      o.on_next do |inner_source|
        gate.synchronize do
          if active < max_concurrent
            active += 1
            subscriber.call inner_source
          else
            q << inner_source
          end
        end
      end

      o.on_error {|err| gate.synchronize { observer.on_error err } }

      o.on_completed do
        stopped = true
        observer.on_completed if active == 0
      end
    end

    group << subscribe(inner_obs)
  end
end

#min(&block) ⇒ Rx::Observable

Returns the minimum element in an observable sequence.

Parameters:

  • block (Proc)

    An optional selector function to produce an element.

Returns:



330
331
332
333
# File 'lib/rx/operators/aggregates.rb', line 330

def min(&block)
  return map(&block).min if block_given?
  min_by {|x| x} .map {|x| x[0] }
end

#min_by(&block) ⇒ Rx::Observable

Returns the elements in an observable sequence with the minimum key value. minimum key value.

Parameters:

  • block (Proc)

    Key selector function.

Returns:

  • (Rx::Observable)

    >An observable sequence containing a list of zero or more elements that have a



339
340
341
# File 'lib/rx/operators/aggregates.rb', line 339

def min_by(&block)
  extrema_by(true, &block)
end

#multicast(subject_or_subject_selector, selector = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
12
# File 'lib/rx/linq/observable/multicast.rb', line 3

def multicast(subject_or_subject_selector, selector = nil)
  if Proc === subject_or_subject_selector
    AnonymousObservable.new do |observer|
      connectable = self.multicast(subject_or_subject_selector.call)
      CompositeSubscription.new [selector.call(connectable).subscribe(observer), self]
    end
  else
    ConnectableObservable.new(self, subject_or_subject_selector)
  end
end

#none?(&block) ⇒ Rx::Observable

Determines whether no elements of an observable sequence satisfy a condition if block given, else if all are false

Parameters:

  • block (Proc)

Returns:



75
76
77
78
79
# File 'lib/rx/operators/aggregates.rb', line 75

def none?(&block)
  block ||= lambda { |_| true }
  select {|v| !(block.call v)}.
  any?
end

#observe_on(scheduler) ⇒ Object

Wraps the source sequence in order to run its observer callbacks on the specified scheduler.



32
33
34
35
36
37
38
# File 'lib/rx/operators/synchronization.rb', line 32

def observe_on(scheduler)
  raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler

  AnonymousObservable.new do |observer|
    subscribe(ObserveOnObserver.new scheduler, observer)
  end
end

#on_error_resume_next(other) ⇒ Object

Concatenates the second observable sequence to the first observable sequence upon successful or exceptional termination of the first.



304
305
306
307
308
# File 'lib/rx/operators/multiple.rb', line 304

def on_error_resume_next(other)
  raise ArgumentError.new 'Other cannot be nil' unless other

  Observable.on_error_resume_next self, other
end

#pluck(prop) ⇒ Object



3
4
5
# File 'lib/rx/linq/observable/pluck.rb', line 3

def pluck(prop)
  self.map {|x| x[prop]}
end

#publish(&selector) ⇒ Object



3
4
5
6
7
8
9
# File 'lib/rx/linq/observable/publish.rb', line 3

def publish(&selector)
  if block_given?
    multicast(lambda { Subject.new }, Proc.new)
  else
    multicast(Subject.new)
  end
end

#reduce(*args, &block) ⇒ Rx::Observable Also known as: aggregate

Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value. For aggregation behavior with incremental intermediate results, see Rx::Observable.scan

Returns:



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rx/operators/aggregates.rb', line 47

def reduce(*args, &block)
  # Argument parsing to support:
  # 1. (seed, Symbol) || (seed, &block)
  # 2. (Symbol) || (&block)
  if (args.length == 2 && args[1].is_a?(Symbol)) || (args.length == 1 && block_given?)
    scan(*args, &block).start_with(args[0]).final
  elsif (args.length == 1 && args[0].is_a?(Symbol)) || (args.length == 0 && block_given?)
    scan(*args, &block).final
  else
    raise ArgumentError.new 'Invalid arguments'
  end
end

#repeat(repeat_count) ⇒ Object

Repeats the observable sequence a specified number of times.



171
172
173
# File 'lib/rx/operators/single.rb', line 171

def repeat(repeat_count)
  Observable.concat(enumerator_repeat_times(repeat_count, self))
end

#repeat_infinitelyObject

Repeats the observable sequence indefinitely.



166
167
168
# File 'lib/rx/operators/single.rb', line 166

def repeat_infinitely
  Observable.concat(enumerator_repeat_infinitely(self))
end

#rescue_error(other = nil, &action) ⇒ Object

Continues an observable sequence that is terminated by an exception of the specified type with the observable sequence produced by the handler or continues an observable sequence that is terminated by an exception with the next observable sequence.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/rx/operators/multiple.rb', line 81

def rescue_error(other = nil, &action)
  return Observable.rescue_error(other) if other && !block_given?
  raise ArgumentError.new 'Invalid arguments' if other.nil? && !block_given?

  AnonymousObservable.new do |observer|
    subscription = SerialSubscription.new

    d1 = SingleAssignmentSubscription.new
    subscription.subscription = d1

    new_obs = Observer.configure do |o|
      o.on_next(&observer.method(:on_next))

      o.on_error do |err|
        result = nil
        begin
          result = action.call(err)
        rescue => e
          observer.on_error(e)
          next
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d
        d.subscription = result.subscribe observer
      end

      o.on_completed(&observer.method(:on_completed))
    end

    d1.subscription = subscribe new_obs
    subscription
  end
end

#retry(retry_count) ⇒ Object

Repeats the source observable sequence the specified number of times or until it successfully terminates.



181
182
183
# File 'lib/rx/operators/single.rb', line 181

def retry(retry_count)
  Observable.rescue_error(enumerator_repeat_times(retry_count, self))
end

#retry_infinitelyObject

Repeats the source observable sequence until it successfully terminates.



176
177
178
# File 'lib/rx/operators/single.rb', line 176

def retry_infinitely
  Observable.rescue_error(enumerator_repeat_infinitely(self))
end

#scan(*args, &block) ⇒ Object

Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no intermediate results, see Observable.reduce.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/rx/operators/single.rb', line 188

def scan(*args, &block)
  has_seed = false
  seed = nil
  action = nil

  # Argument parsing to support:
  # 1. (seed, Symbol)
  # 2. (seed, &block)
  # 3. (Symbol)
  # 4. (&block)
  if args.length == 2 && args[1].is_a?(Symbol)
    seed = args[0]
    action = args[1].to_proc
    has_seed = true
  elsif args.length == 1 && block_given?
    seed = args[0]
    has_seed = true
    action = block
  elsif args.length == 1 && args[0].is_a?(Symbol)
    action = args[0].to_proc
  elsif args.length == 0 && block_given?
    action = block
  else
    raise ArgumentError.new 'Invalid arguments'
  end

  AnonymousObservable.new do |observer|

    has_accumulation = false
    accumulation = nil
    has_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        begin
          has_value = true unless has_value

          if has_accumulation
            accumulation = action.call(accumulation, x)
          else
            accumulation = has_seed ? action.call(seed, x) : x
            has_accumulation = true
          end
        rescue => err
          observer.on_error err
          break
        end

        observer.on_next accumulation
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next seed if !has_value && has_seed
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end

#select(&block) ⇒ Object Also known as: find_all

Filters the elements of an observable sequence based on a predicate.



244
245
246
# File 'lib/rx/operators/standard_query_operators.rb', line 244

def select(&block)
  select_with_index {|x, _| block.call x }
end

#select_with_index(&block) ⇒ Object Also known as: find_all_with_index

Filters the elements of an observable sequence based on a predicate by incorporating the element’s index.



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/rx/operators/standard_query_operators.rb', line 250

def select_with_index(&block)
  AnonymousObservable.new do |observer|
    i = 0

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        should_run = false
        begin
          should_run = block.call(x, i)
          i += 1
        rescue => e
          observer.on_error e
          next
        end

        observer.on_next x if should_run
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)        
  end
end

#sequence_eql?(other) ⇒ Rx::Observable

Determines whether two sequences are equal by comparing the elements pairwise. sequences are of equal length and their corresponding elements are equal.

Parameters:

Returns:

  • (Rx::Observable)

    An observable sequence that contains a single element which indicates whether both



347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/rx/operators/aggregates.rb', line 347

def sequence_eql?(other)
  AnonymousObservable.new do |observer|
    gate = Mutex.new
    left_done = false
    right_done = false
    left_queue = []
    right_queue = []

    obs1 = Observer.configure do |o|
      o.on_next do |x|
        gate.synchronize do
          if right_queue.length > 0
            v = right_queue.shift
            equal = x == v

            unless equal
              observer.on_next false
              observer.on_completed
            end
          elsif right_done
            observer.on_next false
            observer.on_completed
          else
            left_queue.push x
          end
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        gate.synchronize do
          left_done = true
          if left_queue.length == 0
            if right_queue.length > 0
              observer.on_next false
              observer.on_completed
            elsif right_done
              observer.on_next true
              observer.on_completed
            end
          end
        end
      end
    end

    subscription1 = subscribe obs1

    obs2 = Observer.configure do |o|
      o.on_next do |x|
        gate.synchronize do
          if left_queue.length > 0
            v = left_queue.shift
            equal = x == v

            unless equal
              observer.on_next false
              observer.on_completed
            end
          elsif left_done
            observer.on_next false
            observer.on_completed
          else
            right_queue.push x
          end
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        gate.synchronize do
          right_done = true
          if right_queue.length == 0
            if left_queue.length > 0
              observer.on_next false
              observer.on_completed
            elsif left_done
              observer.on_next true
              observer.on_completed
            end
          end
        end
      end
    end

    subscription2 = other.subscribe obs2

    CompositeSubscription.new [subscription1, subscription2]
  end
end

#single(&block) ⇒ Rx::Observable

Returns the only element of an observable sequence, and reports an exception if there is not exactly one element in the observable sequence.

Parameters:

  • block (Proc)

    A predicate function to evaluate for elements in the source sequence.

Returns:

  • (Rx::Observable)

    >Sequence containing the single element in the observable sequence.



443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/rx/operators/aggregates.rb', line 443

def single(&block)
  return select(&block).single if block_given?
  AnonymousObservable.new do |observer|
    seen_value = false
    value = nil

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        if seen_value
          observer.on_error(RuntimeError.new 'More than one element produced')
        else
          value = x
          seen_value = true
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        if seen_value
          observer.on_next value
          observer.on_completed
        else
          observer.on_error(RuntimeError.new 'Sequence contains no elements')
        end
      end
    end

    subscribe new_obs
  end
end

#single_or_default(default_value = nil, &block) ⇒ Rx::Observable

Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method reports an exception if there is more than one element in the observable sequence. if no such element exists.

Parameters:

  • default_value (Object) (defaults to: nil)

    The default value if no value is provided

  • block (Proc)

    A predicate function to evaluate for elements in the source sequence.

Returns:

  • (Rx::Observable)

    Sequence containing the single element in the observable sequence, or a default value



481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
# File 'lib/rx/operators/aggregates.rb', line 481

def single_or_default(default_value = nil, &block)
  return select(&block).single_or_default(default_value) if block_given?
  AnonymousObservable.new do |observer|
    seen_value = false
    value = nil

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        if seen_value
          observer.on_error(RuntimeError.new 'More than one element produced')
        else
          value = x
          seen_value = true
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next (seen_value ? value : default_value)
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end

#skip(count) ⇒ Object

Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/rx/operators/standard_query_operators.rb', line 118

def skip(count)
  AnonymousObservable.new do |observer|
    remaining = count

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        if remaining <= 0
          observer.on_next x
        else 
          remaining -= 1
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end


    subscribe(new_observer)
  end
end

#skip_last(count) ⇒ Object

Bypasses a specified number of elements at the end of an observable sequence.

Parameters:

  • count (Numeric)

    The number of elements to bypass at the end of an observable sequence.



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/rx/operators/single.rb', line 253

def skip_last(count)
  raise ArgumentError.new 'Count cannot be less than zero' if count < 0
  AnonymousObservable.new do |observer|
    q = []
    new_obs = Observer.configure do |o|

      o.on_next do |x|
        q.push x
        observer.on_next(q.shift) if q.length > count
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end

#skip_until(other) ⇒ Object

Returns the elements from the source observable sequence only after the other observable sequence produces an element.



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/rx/operators/multiple.rb', line 311

def skip_until(other)
  raise ArgumentError.new 'Other cannot be nil' unless other

  AnonymousObservable.new do |observer|
    source_subscription = SingleAssignmentSubscription.new
    other_subscription = SingleAssignmentSubscription.new

    open = false
    gate = Monitor.new

    source_obs = Observer.configure do |o|
      o.on_next {|x| observer.on_next x if open }
      o.on_error(&observer.method(:on_error))
      o.on_completed { observer.on_completed if open }
    end

    other_obs = Observer.configure do |o|
      o.on_next do |_|
        open = true
        other_subscription.unsubscribe
      end

      o.on_error(&observer.method(:on_error))
    end

    source_subscription.subscription = synchronize(gate).subscribe(source_obs)
    other_subscription.subscription = other.synchronize(gate).subscribe(other_obs)

    CompositeSubscription.new [source_subscription, other_subscription]
  end
end

#skip_while(&block) ⇒ Object

Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.



142
143
144
# File 'lib/rx/operators/standard_query_operators.rb', line 142

def skip_while(&block)
  skip_while_with_index {|x, _| block.call x }
end

#skip_while_with_index(&block) ⇒ Object

Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. The element’s index is used in the logic of the predicate function.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/rx/operators/standard_query_operators.rb', line 148

def skip_while_with_index(&block)
  AnonymousObservable.new do |observer|
    running = false
    i = 0

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        unless running
          begin
            running = !block.call(x, i)
            i += 1
          rescue => e
            observer.on_error e
            next
          end

          observer.on_next x if running
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end

#start_with(*args) ⇒ Object

Prepends a sequence of values to an observable sequence.



273
274
275
276
277
278
279
# File 'lib/rx/operators/single.rb', line 273

def start_with(*args)
  scheduler = CurrentThreadScheduler.instance
  if args.size > 0 && Scheduler === args[0]
    scheduler = args.shift
  end
  Observable.from_array(args, scheduler).concat(self)
end

#subscribe(*args) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/rx/core/observable.rb', line 13

def subscribe(*args)
  case args.size
  when 0
    if block_given?
       Observer.configure {|o| o.on_next(&Proc.new) }
    else
       Observer.configure
    end
  when 1
     args[0]
  when 3
     Observer.configure {|o|
      o.on_next(&args[0])
      o.on_error(&args[1])
      o.on_completed(&args[2])
    }
  else
    raise ArgumentError, "wrong number of arguments (#{args.size} for 0..1 or 3)"
  end
end

#subscribe_on(scheduler) ⇒ Object

Wraps the source sequence in order to run its subscription and unsubscribe logic on the specified scheduler.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rx/operators/synchronization.rb', line 15

def subscribe_on(scheduler)
  raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler

  AnonymousObservable.new do |observer|
    m = SingleAssignmentSubscription.new
    d = SerialSubscription.new
    d.subscription = m

    m.subscription = scheduler.schedule lambda {
      d.subscription = ScheduledSubscription.new scheduler, (subscribe observer)
    }

    d
  end
end

#subscribe_on_completed(&block) ⇒ Object

Subscribes the given block to the on_completed action of the observable sequence.



69
70
71
72
# File 'lib/rx/core/observable.rb', line 69

def subscribe_on_completed(&block)
  raise ArgumentError.new 'Block is required' unless block_given?
  subscribe(Observer.configure {|o| o.on_completed(&block) })
end

#subscribe_on_error(&block) ⇒ Object

Subscribes the given block to the on_error action of the observable sequence.



63
64
65
66
# File 'lib/rx/core/observable.rb', line 63

def subscribe_on_error(&block)
  raise ArgumentError.new 'Block is required' unless block_given?
  subscribe(Observer.configure {|o| o.on_error(&block) })
end

#subscribe_on_next(&block) ⇒ Subscription

Subscribes the given block to the on_next action of the observable sequence.

Parameters:

  • block (Object)

Returns:



57
58
59
60
# File 'lib/rx/core/observable.rb', line 57

def subscribe_on_next(&block)
  raise ArgumentError.new 'Block is required' unless block_given?
  subscribe(Observer.configure {|o| o.on_next(&block) })
end

#sum(&block) ⇒ Rx::Observable

Computes the sum of a sequence of values. source sequence.

Parameters:

  • block (Proc)

    Optional block used to obtain the value to sum.

Returns:

  • (Rx::Observable)

    An observable sequence containing a single element with the sum of the values in the



513
514
515
516
# File 'lib/rx/operators/aggregates.rb', line 513

def sum(&block)
  return map(&block).sum if block_given?
  reduce(0) {|acc, x| acc + x}
end

#synchronize(gate = Monitor.new) ⇒ Object

Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.



41
42
43
44
45
# File 'lib/rx/operators/synchronization.rb', line 41

def synchronize(gate = Monitor.new)
  AnonymousObservable.new do |observer|
    subscribe(Observer.allow_reentrancy observer, gate)
  end
end

#take(count, scheduler = ImmediateScheduler.instance) ⇒ Object

Returns a specified number of contiguous elements from the start of an observable sequence.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/rx/operators/standard_query_operators.rb', line 178

def take(count, scheduler = ImmediateScheduler.instance)
  return Observable.empty(scheduler) if count == 0

  AnonymousObservable.new do |observer|

    remaining = count

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        if remaining > 0
          remaining -= 1
          observer.on_next x
          observer.on_completed if remaining == 0
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end

#take_last(count, scheduler = CurrentThreadScheduler.instance) ⇒ Object

Returns a specified number of contiguous elements from the end of an observable sequence.



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/rx/operators/single.rb', line 282

def take_last(count, scheduler = CurrentThreadScheduler.instance)
  raise ArgumentError.new 'Count cannot be less than zero' if count < 0
  AnonymousObservable.new do |observer|
    q = []
    g = CompositeSubscription.new

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        q.push x
        q.shift if q.length > 0
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        g.push(scheduler.schedule_recursive lambda {|this|
          if q.length > 0
            observer.on_next(q.shift)
            this.call
          else
            observer.on_completed
          end
        })
      end

      g.add(subscribe new_obs)
      g
    end
  end
end

#take_last_buffer(count) ⇒ Object

Returns a list with the specified number of contiguous elements from the end of an observable sequence.



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/rx/operators/single.rb', line 314

def take_last_buffer(count)
  AnonymousObservable.new do |observer|
    q = []
    new_obs = Observer.configure do |o|
      o.on_next do |x|
        q.push x
        q.shift if q.length > count
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next q
        observer.on_completed
      end
    end

    susbcribe new_obs
  end
end

#take_until(other) ⇒ Object

Returns the elements from the source observable sequence until the other observable sequence produces an element.



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/rx/operators/multiple.rb', line 396

def take_until(other)
  raise ArgumentError.new 'other cannot be nil' unless other

  AnonymousObservable.new do |observer| 
    source_subscription = SingleAssignmentSubscription.new
    other_subscription = SingleAssignmentSubscription.new

    gate = Monitor.new

    other_obs = Observer.configure do |o|
      o.on_next {|_| observer.on_completed }
      o.on_error(&observer.method(:on_error))
    end

    other_subscription.subscription = other.synchronize(gate).subscribe(other_obs)
    source_subscription.subscription = synchronize(gate).ensures(&other_subscription.method(:unsubscribe)).subscribe(observer)

    CompositeSubscription.new [source_subscription, other_subscription]
  end
end

#take_while(&block) ⇒ Object

Returns elements from an observable sequence as long as a specified condition is true.



204
205
206
# File 'lib/rx/operators/standard_query_operators.rb', line 204

def take_while(&block)
  take_while_with_index {|x, _| block.call x }
end

#take_while_with_index(&block) ⇒ Object

Returns elements from an observable sequence as long as a specified condition is true. The element’s index is used in the logic of the predicate function.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/rx/operators/standard_query_operators.rb', line 210

def take_while_with_index(&block)
  AnonymousObservable.new do |observer|
    running = true
    i = 0

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        if running
          begin
            running = block.call(x, i)
            i += 1
          rescue => e
            observer.on_error e
            next
          end

          if running
            observer.on_next x
          else
            observer.on_completed
          end
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end      
end

#tap(observer) ⇒ Object

Invokes the observer’s methods for each message in the source sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/rx/operators/single.rb', line 75

def tap(observer)
  raise ArgumentError.new 'Observer cannot be nil' unless observer
  AnonymousObservable.new do |obs|
    new_obs = Rx::Observer.configure do |o|

      o.on_next do |value|
        begin
          observer.on_next value
        rescue => err
          obs.on_error err
        end

        obs.on_next value
      end

      o.on_error do |err|
        begin
          observer.on_error err
        rescue => e
          obs.on_error e
        end

        obs.on_error err
      end

      o.on_completed do
        begin
          observer.on_completed
        rescue => err
          obs.on_error err
        end

        obs.on_completed
      end

    end

    subscribe new_obs
  end
end

#time_interval(scheduler = DefaultScheduler.instance) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
# File 'lib/rx/linq/observable/time_interval.rb', line 3

def time_interval(scheduler = DefaultScheduler.instance)
  Observable.defer {
    last = scheduler.now
    self.map {|x|
      now = scheduler.now
      span = now - last
      last = now
      TimeInterval.new(span, x)
    }
  }
end

#timestamp(scheduler = DefaultScheduler.instance) ⇒ Object



3
4
5
6
7
# File 'lib/rx/linq/observable/timestamp.rb', line 3

def timestamp(scheduler = DefaultScheduler.instance)
  map do |x|
    { value: x, timestamp: scheduler.now }
  end
end

#to_aRx::Observable

Creates an array from an observable sequence.

Returns:



520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/rx/operators/aggregates.rb', line 520

def to_a
  AnonymousObservable.new do |observer|
    arr = []
    self.subscribe(
      arr.method(:push),
      observer.method(:on_error),
      lambda {
        observer.on_next arr
        observer.on_completed
      })
  end
end

#to_h {|h| ... } ⇒ Rx::Observable

Creates a Hash from the observable collection. Note that any duplicate keys will be overwritten.

Yields:

  • (h)

Returns:



554
555
556
557
558
559
560
561
# File 'lib/rx/operators/aggregates.rb', line 554

def to_h
  h = HashConfiguration.new
  yield h if block_given?
  reduce(Hash.new) do |acc, x|
    acc[h.key_selector_block.call x] = h.value_selector_block.call x
    acc
  end
end

#window_with_count(count, skip) ⇒ Object

Projects each element of an observable sequence into zero or more windows which are produced based on element count information.



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/rx/operators/single.rb', line 336

def window_with_count(count, skip)
  raise ArgumentError.new 'Count must be greater than zero' if count <= 0
  raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0

  AnonymousObservable.new do |observer|
    q = []
    n = 0

    m = SingleAssignmentSubscription.new
    ref_count_disposable = RefCountSubscription.new m

    create_window = lambda {
      s = Subject.new
      q.push s
      observer.on_next(s.add_ref(ref_count_disposable))
    }

    create_window.call

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        q.each {|s| s.on_next x}

        c = n - count + 1
        q.shift.on_completed if c >=0 && c % skip == 0

        n += 1
        create_window.call if n % skip == 0
      end

      o.on_error do |err|
        q.shift.on_error err while q.length > 0
        observer.on_error err
      end

      o.on_completed do
        q.shift.on_completed while q.length > 0
        observer.on_completed
      end
    end

    m.subscription = subscribe new_obs
    ref_count_disposable
  end
end

#window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) ⇒ Object

Projects each element of an observable sequence into consecutive non-overlapping windows which are produced based on timing information.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/rx/operators/time.rb', line 30

def window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance)
  raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0
  raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0

  AnonymousObservable.new do |observer|
    total_time = 0
    next_shift = time_shift
    next_span = time_span

    gate = Mutex.new
    q = []

    timer_d = SerialSubscription.new
    group_subscription = CompositeSubscription.new [timer_d]
    ref_count_subscription = RefCountSubscription.new(group_subscription)

    create_timer = lambda {
      m = SingleAssignmentSubscription.new
      timer_d.subscription = m

      is_span = false
      is_shift = false
      if next_span == next_shift
        is_span = true
        is_shift = true
      elsif next_span < next_shift
        is_span = true
      else
        is_shift = true
      end

      new_total_time = is_span ? next_span : next_shift
      ts = new_total_time - total_time
      total_time = new_total_time

      if is_span
        next_span += time_shift
      end
      if is_shift
        next_shift += time_shift
      end

      m.subscription = scheduler.schedule_relative(ts, lambda {
        gate.synchronize do
          if is_shift
            s = Subject.new
            q.push s
            observer.on_next(s.add_ref(ref_count_subscription))
          end
          if is_span
            s = q.shift
            s.on_completed
          end
          create_timer.call
        end
      })
    }

    q.push(Subject.new)
    observer.on_next(q[0].add_ref(ref_count_subscription))
    create_timer.call

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        gate.synchronize do
          q.each {|s| s.on_next x}
        end
      end

      o.on_error do |err|
        gate.synchronize do
          q.each {|s| s.on_error err}
          observer.on_error err
        end
      end

      o.on_completed do
        gate.synchronize do
          q.each {|s| s.on_on_completed}
          observer.on_completed
        end
      end
    end

    group_subscription.push subscribe(new_obs)

    ref_count_subscription
  end
end

#zip(*args, &result_selector) ⇒ Object

Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.



418
419
420
421
# File 'lib/rx/operators/multiple.rb', line 418

def zip(*args, &result_selector)
  args.unshift(self)
  Observable.zip(*args, &result_selector)
end