Module: Rx::Observable
- Included in:
- AnonymousObservable, AsyncSubject, BehaviorSubject, HotObservable, ReplaySubject, Subject, Subject::AnonymousSubject
- Defined in:
- lib/rx/operators/time.rb,
lib/rx/internal/util.rb,
lib/rx/core/observable.rb,
lib/rx/operators/single.rb,
lib/rx/linq/observable/do.rb,
lib/rx/operators/creation.rb,
lib/rx/operators/multiple.rb,
lib/rx/linq/observable/and.rb,
lib/rx/operators/aggregates.rb,
lib/rx/linq/observable/delay.rb,
lib/rx/linq/observable/pluck.rb,
lib/rx/linq/observable/publish.rb,
lib/rx/linq/observable/contains.rb,
lib/rx/linq/observable/debounce.rb,
lib/rx/linq/observable/interval.rb,
lib/rx/linq/observable/aggregate.rb,
lib/rx/linq/observable/multicast.rb,
lib/rx/linq/observable/timestamp.rb,
lib/rx/operators/synchronization.rb,
lib/rx/linq/observable/concat_all.rb,
lib/rx/linq/observable/concat_map.rb,
lib/rx/linq/observable/group_join.rb,
lib/rx/linq/observable/time_interval.rb,
lib/rx/operators/standard_query_operators.rb,
lib/rx/linq/observable/concat_map_observer.rb,
lib/rx/linq/observable/delay_with_selector.rb
Overview
Time based operations
Defined Under Namespace
Classes: AmbObserver, HashConfiguration
Class Method Summary collapse
-
.amb(*args) ⇒ Object
Propagates the observable sequence that reacts first.
- .case(selector, sources, defaultSourceOrScheduler = Observable.empty) ⇒ Object (also: switchCase)
-
.combine_latest(*args, &result_selector) ⇒ Object
Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.
-
.concat(*args) ⇒ Object
Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.
-
.create(&subscribe) ⇒ Object
Creates an observable sequence from a specified subscribe method implementation.
-
.defer ⇒ Object
Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
-
.empty(scheduler = ImmediateScheduler.instance) ⇒ Object
Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.
- .for(sources, result_selector = nil) ⇒ Object
- .fork_join(*all_sources) ⇒ Object
- .from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance) ⇒ Object
- .from_array(array, scheduler = CurrentThreadScheduler.instance) ⇒ Object
-
.generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence by running a state-driven loop producing the sequence’s elements.
- .if(condition, then_source, else_source_or_scheduler = nil) ⇒ Object
- .interval(period, scheduler = Rx::DefaultScheduler.instance) ⇒ Object
-
.just(value, scheduler = ImmediateScheduler.instance) ⇒ Object
(also: return)
Returns an observable sequence that contains a single element.
-
.merge_all(*args) ⇒ Object
(also: merge)
Merges elements from all of the specified observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.
-
.merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args) ⇒ Object
Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences, and using the specified scheduler for enumeration of and subscription to the sources.
-
.never ⇒ Object
Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
- .of(*args) ⇒ Object
-
.of_array(array, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Converts an array to an observable sequence, using an optional scheduler to enumerate the array.
-
.of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Converts an Enumerable to an observable sequence, using an optional scheduler to enumerate the array.
-
.of_enumerator(enum, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Converts an Enumerator to an observable sequence, using an optional scheduler to enumerate the array.
-
.on_error_resume_next(*args) ⇒ Object
Concatenates all of the specified observable sequences, even if the previous observable sequence terminated exceptionally.
- .pairs(obj, scheduler = CurrentThreadScheduler.instance) ⇒ Object
-
.raise_error(error, scheduler = ImmediateScheduler.instance) ⇒ Object
Returns an observable sequence that terminates with an exception.
-
.range(start, count, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence of integral numbers within a specified range.
-
.repeat(value, count, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence that repeats the given element the specified number of times.
-
.repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence that repeats the given element infinitely.
-
.rescue_error(*args) ⇒ Object
Continues an observable sequence that is terminated by an exception with the next observable sequence.
- .start(func, context, scheduler = DefaultScheduler.instance) ⇒ Object
- .timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance) ⇒ Object
- .to_async(func, context = nil, scheduler = DefaultScheduler.instance) ⇒ Object
-
.using(resource_factory, observable_factory) ⇒ Object
Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence’s lifetime.
- .when(*plans) ⇒ Object
- .while(condition, source) ⇒ Object
-
.zip(*args, &result_selector) ⇒ Object
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.
Instance Method Summary collapse
-
#_subscribe(observer) ⇒ Subscription
Subscribes the given observer to the observable sequence.
- #add_ref(r) ⇒ Object
-
#all?(&block) ⇒ Rx::Observable
Determines whether all elements of an observable sequence satisfy a condition if block given, else if all are true.
-
#amb(second) ⇒ Object
Propagates the observable sequence that reacts first.
- #and(right) ⇒ Object
-
#any?(&block) ⇒ Rx::Observable
Determines whether any element of an observable sequence satisfies a condition if a block is given else if there are any items in the observable sequence.
-
#as_observable ⇒ Object
Hides the identity of an observable sequence.
-
#average(&block) ⇒ Rx::Observable
Computes the average of an observable sequence of values that are optionally obtained by invoking a transform function on each element of the input sequence if a block is given.
-
#buffer_with_count(count, skip = count) ⇒ Object
Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
-
#buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) ⇒ Object
Projects each element of an observable sequence into consecutive non-overlapping buffers which are produced based on timing information.
-
#combine_latest(other, &result_selector) ⇒ Object
Merges two observable sequences into one observable sequence by using the selector function whenever one of the observable sequences produces an element.
-
#concat(*other) ⇒ Object
Concatenates the second observable sequence to the first observable sequence upon successful termination of the first.
- #concat_all ⇒ Object
- #concat_map(selector, result_selector = nil) ⇒ Object
- #concat_map_observer(on_next, on_error, on_completed) ⇒ Object
- #contains(search_element, from_index = 0) ⇒ Object
-
#contains?(item) ⇒ Rx::Observable
Determines whether an observable sequence contains a specified element.
-
#count(&block) ⇒ Object
Returns an observable sequence containing a number that represents how many elements in the specified observable sequence satisfy a condition if the block is given, else the number of items in the observable sequence.
- #debounce(due_time, scheduler = DefaultScheduler.instance) ⇒ Object
-
#default_if_empty(default_value = nil) ⇒ Object
Returns the elements of the specified sequence or the type parameter’s default value in a singleton sequence if the sequence is empty.
- #delay(due_time, scheduler = DefaultScheduler.instance) ⇒ Object
- #delay_with_selector(subscription_delay, delay_duration_selector = nil) ⇒ Object
-
#dematerialize ⇒ Object
Dematerializes the explicit notification values of an observable sequence as implicit notifications.
-
#distinct(&key_selector) ⇒ Object
Returns an observable sequence that contains only distinct elements according to the optional key_selector.
-
#distinct_until_changed(&key_selector) ⇒ Object
Returns an observable sequence that contains only distinct contiguous elements according to the optional key_selector.
- #do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil) ⇒ Object
-
#element_at(index) ⇒ Rx::Observable
Returns the element at a specified index in a sequence.
-
#element_at_or_default(index, default_value = nil) ⇒ Object
Returns the element at a specified index in a sequence or a default value if the index is out of range.
-
#empty? ⇒ Rx::Observable
Determines whether an observable sequence is empty.
-
#ensures ⇒ Object
Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.
- #enumerator_repeat_infinitely(value) ⇒ Object
- #enumerator_repeat_times(num, value) ⇒ Object
-
#final ⇒ Rx::Observable
Internal method to get the final value.
-
#first(&block) ⇒ Rx::Observable
Returns the first element of an observable sequence that satisfies the condition in the predicate if a block is given, else the first item in the observable sequence.
-
#first_or_default(default_value = nil, &block) ⇒ Rx::Observable
Returns the first element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists.
-
#flat_map(&block) ⇒ Object
Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.
-
#flat_map_with_index(&block) ⇒ Object
Projects each element of an observable sequence to an observable sequence by incorporating the element’s index and merges the resulting observable sequences into one observable sequence.
- #group_join(right, left_duration_selector, right_duration_selector, result_selector) ⇒ Object
-
#ignore_elements ⇒ Object
Ignores all elements in an observable sequence leaving only the termination messages.
-
#last(&block) ⇒ Rx::Observable
Returns the last element of an observable sequence that satisfies the condition in the predicate if the block is given, else the last element in the observable sequence.
-
#last_or_default(default_value = nil, &block) ⇒ Rx::Observable
Returns the last element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists.
-
#latest ⇒ Object
Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
-
#map(&block) ⇒ Object
Projects each element of an observable sequence into a new form.
-
#map_with_index(&block) ⇒ Object
Projects each element of an observable sequence into a new form by incorporating the element’s index.
-
#materialize ⇒ Object
Materializes the implicit notifications of an observable sequence as explicit notification values.
-
#max(&block) ⇒ Rx::Observable
Returns the maximum element in an observable sequence.
-
#max_by(&block) ⇒ Rx::Observable
Returns the elements in an observable sequence with the maximum key value.
-
#merge(other, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Merges elements from two observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.
-
#merge_all ⇒ Object
Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
-
#merge_concurrent(max_concurrent = 1) ⇒ Object
Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences.
-
#min(&block) ⇒ Rx::Observable
Returns the minimum element in an observable sequence.
-
#min_by(&block) ⇒ Rx::Observable
Returns the elements in an observable sequence with the minimum key value.
- #multicast(subject_or_subject_selector, selector = nil) ⇒ Object
-
#none?(&block) ⇒ Rx::Observable
Determines whether no elements of an observable sequence satisfy a condition if block given, else if all are false.
-
#observe_on(scheduler) ⇒ Object
Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
-
#on_error_resume_next(other) ⇒ Object
Concatenates the second observable sequence to the first observable sequence upon successful or exceptional termination of the first.
- #pluck(prop) ⇒ Object
- #publish(&selector) ⇒ Object
-
#reduce(*args, &block) ⇒ Rx::Observable
(also: #aggregate)
Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence.
-
#repeat(repeat_count) ⇒ Object
Repeats the observable sequence a specified number of times.
-
#repeat_infinitely ⇒ Object
Repeats the observable sequence indefinitely.
-
#rescue_error(other = nil, &action) ⇒ Object
Continues an observable sequence that is terminated by an exception of the specified type with the observable sequence produced by the handler or continues an observable sequence that is terminated by an exception with the next observable sequence.
-
#retry(retry_count) ⇒ Object
Repeats the source observable sequence the specified number of times or until it successfully terminates.
-
#retry_infinitely ⇒ Object
Repeats the source observable sequence until it successfully terminates.
-
#scan(*args, &block) ⇒ Object
Applies an accumulator function over an observable sequence and returns each intermediate result.
-
#select(&block) ⇒ Object
(also: #find_all)
Filters the elements of an observable sequence based on a predicate.
-
#select_with_index(&block) ⇒ Object
(also: #find_all_with_index)
Filters the elements of an observable sequence based on a predicate by incorporating the element’s index.
-
#sequence_eql?(other) ⇒ Rx::Observable
Determines whether two sequences are equal by comparing the elements pairwise.
-
#single(&block) ⇒ Rx::Observable
Returns the only element of an observable sequence, and reports an exception if there is not exactly one element in the observable sequence.
-
#single_or_default(default_value = nil, &block) ⇒ Rx::Observable
Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method reports an exception if there is more than one element in the observable sequence.
-
#skip(count) ⇒ Object
Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
-
#skip_last(count) ⇒ Object
Bypasses a specified number of elements at the end of an observable sequence.
-
#skip_until(other) ⇒ Object
Returns the elements from the source observable sequence only after the other observable sequence produces an element.
-
#skip_while(&block) ⇒ Object
Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
-
#skip_while_with_index(&block) ⇒ Object
Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
-
#start_with(*args) ⇒ Object
Prepends a sequence of values to an observable sequence.
- #subscribe(*args) ⇒ Object
-
#subscribe_on(scheduler) ⇒ Object
Wraps the source sequence in order to run its subscription and unsubscribe logic on the specified scheduler.
-
#subscribe_on_completed(&block) ⇒ Object
Subscribes the given block to the on_completed action of the observable sequence.
-
#subscribe_on_error(&block) ⇒ Object
Subscribes the given block to the on_error action of the observable sequence.
-
#subscribe_on_next(&block) ⇒ Subscription
Subscribes the given block to the on_next action of the observable sequence.
-
#sum(&block) ⇒ Rx::Observable
Computes the sum of a sequence of values.
-
#synchronize(gate = Monitor.new) ⇒ Object
Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
-
#take(count, scheduler = ImmediateScheduler.instance) ⇒ Object
Returns a specified number of contiguous elements from the start of an observable sequence.
-
#take_last(count, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Returns a specified number of contiguous elements from the end of an observable sequence.
-
#take_last_buffer(count) ⇒ Object
Returns a list with the specified number of contiguous elements from the end of an observable sequence.
-
#take_until(other) ⇒ Object
Returns the elements from the source observable sequence until the other observable sequence produces an element.
-
#take_while(&block) ⇒ Object
Returns elements from an observable sequence as long as a specified condition is true.
-
#take_while_with_index(&block) ⇒ Object
Returns elements from an observable sequence as long as a specified condition is true.
-
#tap(observer) ⇒ Object
Invokes the observer’s methods for each message in the source sequence.
- #time_interval(scheduler = DefaultScheduler.instance) ⇒ Object
- #timestamp(scheduler = DefaultScheduler.instance) ⇒ Object
-
#to_a ⇒ Rx::Observable
Creates an array from an observable sequence.
-
#to_h {|h| ... } ⇒ Rx::Observable
Creates a Hash from the observable collection.
-
#window_with_count(count, skip) ⇒ Object
Projects each element of an observable sequence into zero or more windows which are produced based on element count information.
-
#window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) ⇒ Object
Projects each element of an observable sequence into consecutive non-overlapping windows which are produced based on timing information.
-
#zip(*args, &result_selector) ⇒ Object
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.
Class Method Details
.amb(*args) ⇒ Object
Propagates the observable sequence that reacts first.
426 427 428 |
# File 'lib/rx/operators/multiple.rb', line 426 def amb(*args) args.reduce(Observable.never) {|previous, current| previous.amb current } end |
.case(selector, sources, defaultSourceOrScheduler = Observable.empty) ⇒ Object Also known as: switchCase
3 4 5 6 7 8 9 10 11 12 |
# File 'lib/rx/linq/observable/case.rb', line 3 def case(selector, sources, defaultSourceOrScheduler = Observable.empty) defer { if Scheduler === defaultSourceOrScheduler defaultSourceOrScheduler = Observable.empty(defaultSourceOrScheduler) end result = sources[selector.call] result || defaultSourceOrScheduler } end |
.combine_latest(*args, &result_selector) ⇒ Object
Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/rx/operators/multiple.rb', line 494 def combine_latest(*args, &result_selector) AnonymousObservable.new do |observer| result_selector ||= lambda {|*inner_args| inner_args } n = args.length has_value = Array.new(n, false) has_value_all = false values = Array.new(n) is_done = Array.new(n, false) next_item = lambda do |i| has_value[i] = true if has_value_all || (has_value_all = has_value.all?) res = nil begin res = result_selector.call(*values) rescue => e observer.on_error e break end observer.on_next(res) elsif enumerable_select_with_index(is_done) {|_, j| j != i} .all? observer.on_completed break end end done = lambda do |i| is_done[i] = true observer.on_completed if is_done.all? end gate = Monitor.new subscriptions = Array.new(n) do |i| sas = SingleAssignmentSubscription.new sas_obs = Observer.configure do |o| o.on_next do |x| values[i] = x next_item.call i end o.on_error(&observer.method(:on_error)) o.on_completed { done.call i } end sas.subscription = args[i].synchronize(gate).subscribe(sas_obs) sas end CompositeSubscription.new subscriptions end end |
.concat(*args) ⇒ Object
Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 |
# File 'lib/rx/operators/multiple.rb', line 553 def concat(*args) AnonymousObservable.new do |observer| disposed = false e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum subscription = SerialSubscription.new gate = AsyncLock.new cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this| gate.wait do current = nil has_next = false err = nil if disposed break else begin current = e.next has_next = true rescue StopIteration => _ # Do nothing rescue => e err = e end end if err observer.on_error err break end unless has_next observer.on_completed break end d = SingleAssignmentSubscription.new subscription.subscription = d new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error(&observer.method(:on_error)) o.on_completed { this.call } end current.subscribe new_obs end } CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true }}] end end |
.create(&subscribe) ⇒ Object
Creates an observable sequence from a specified subscribe method implementation.
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/rx/operators/creation.rb', line 17 def create(&subscribe) AnonymousObservable.new do |observer| subscription = subscribe.call(observer) case subscription when Subscription subscription when Proc Subscription.create(&subscription) else Subscription.empty end end end |
.defer ⇒ Object
Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/rx/operators/creation.rb', line 32 def defer AnonymousObservable.new do |observer| result = nil e = nil begin result = yield rescue => err e = Observable.raise_error(err).subscribe(observer) end e || result.subscribe(observer) end end |
.empty(scheduler = ImmediateScheduler.instance) ⇒ Object
Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.
47 48 49 50 51 52 53 |
# File 'lib/rx/operators/creation.rb', line 47 def empty(scheduler = ImmediateScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule lambda { observer.on_completed } end end |
.for(sources, result_selector = nil) ⇒ Object
3 4 5 6 7 8 9 10 11 |
# File 'lib/rx/linq/observable/for.rb', line 3 def for(sources, result_selector = nil) result_selector ||= lambda {|*args| args} enum = Enumerator.new {|y| sources.each {|v| y << result_selector.call(v) } } Observable.concat(enum) end |
.fork_join(*all_sources) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/rx/linq/observable/fork_join.rb', line 3 def fork_join(*all_sources) AnonymousObservable.new {|subscriber| count = all_sources.length if count == 0 subscriber.on_completed Subscription.empty end group = CompositeSubscription.new finished = false has_results = Array.new(count) has_completed = Array.new(count) results = Array.new(count) count.times {|i| source = all_sources[i] group.push( source.subscribe( lambda {|value| if !finished has_results[i] = true results[i] = value end }, lambda {|e| finished = true subscriber.on_error e group.dispose }, lambda { if !finished if !has_results[i] subscriber.on_completed return end has_completed[i] = true count.times {|ix| if !has_completed[ix] return end } finished = true subscriber.on_next results subscriber.on_completed end } ) ) } group } end |
.from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/rx/linq/observable/from.rb', line 3 def from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance) it = iterable.to_enum AnonymousObservable.new {|observer| i = 0 scheduler.schedule_recursive lambda {|this| begin result = it.next rescue StopIteration => e observer.on_completed return rescue => e observer.on_error e return end if Proc === map_fn begin result = map_fn.call(result, i) rescue => e observer.on_error e return end end observer.on_next result i += 1 this.call } } end |
.from_array(array, scheduler = CurrentThreadScheduler.instance) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/rx/operators/creation.rb', line 204 def from_array(array, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule_recursive_with_state 0, lambda {|i, this| if i < array.size observer.on_next array[i] this.call(i + 1) else observer.on_completed end } end end |
.generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence by running a state-driven loop producing the sequence’s elements.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/rx/operators/creation.rb', line 56 def generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| state = initial_state first = true scheduler.schedule_recursive lambda{|this| has_result = false result = nil begin if first first = false else state = iterate.call(state) end has_result = condition.call(state) if has_result result = result_selector.call state end rescue => err observer.on_error err break end if has_result observer.on_next result this.call else observer.on_completed end } end end |
.if(condition, then_source, else_source_or_scheduler = nil) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 |
# File 'lib/rx/linq/observable/if.rb', line 3 def if(condition, then_source, else_source_or_scheduler = nil) case else_source_or_scheduler when Scheduler scheduler = else_source_or_scheduler else_source = Observable.empty(scheduler) when Observable else_source = else_source_or_scheduler when nil else_source = Observable.empty end return condition.call ? then_source : else_source end |
.interval(period, scheduler = Rx::DefaultScheduler.instance) ⇒ Object
2 3 4 |
# File 'lib/rx/linq/observable/interval.rb', line 2 def self.interval(period, scheduler = Rx::DefaultScheduler.instance) observable_timer_time_span_and_period(period, period, scheduler) end |
.just(value, scheduler = ImmediateScheduler.instance) ⇒ Object Also known as: return
Returns an observable sequence that contains a single element.
99 100 101 102 103 104 105 106 |
# File 'lib/rx/operators/creation.rb', line 99 def just(value, scheduler = ImmediateScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule lambda { observer.on_next value observer.on_completed } end end |
.merge_all(*args) ⇒ Object Also known as: merge
Merges elements from all of the specified observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.
612 613 614 615 616 617 618 |
# File 'lib/rx/operators/multiple.rb', line 612 def merge_all(*args) scheduler = CurrentThreadScheduler.instance if args.size > 0 && Scheduler === args[0] scheduler = args.shift end Observable.from_array(args, scheduler).merge_all end |
.merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args) ⇒ Object
Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences, and using the specified scheduler for enumeration of and subscription to the sources.
607 608 609 |
# File 'lib/rx/operators/multiple.rb', line 607 def merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args) Observable.from_array(args, scheduler).merge_concurrent(max_concurrent) end |
.never ⇒ Object
Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
92 93 94 95 96 |
# File 'lib/rx/operators/creation.rb', line 92 def never AnonymousObservable.new do |_| end end |
.of(*args) ⇒ Object
3 4 5 6 7 8 9 |
# File 'lib/rx/linq/observable/of.rb', line 3 def of(*args) scheduler = CurrentThreadScheduler.instance if args.size > 0 && Scheduler === args[0] scheduler = args.shift end of_array(args, scheduler) end |
.of_array(array, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Converts an array to an observable sequence, using an optional scheduler to enumerate the array.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/rx/operators/creation.rb', line 110 def of_array(array, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| count = 0 scheduler.schedule_recursive lambda {|this| if count < array.length observer.on_next array[count] count += 1 this.call else observer.on_completed end } end end |
.of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Converts an Enumerable to an observable sequence, using an optional scheduler to enumerate the array.
126 127 128 |
# File 'lib/rx/operators/creation.rb', line 126 def of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance) Observable.of_enumerator(enumerable.to_enum, scheduler) end |
.of_enumerator(enum, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Converts an Enumerator to an observable sequence, using an optional scheduler to enumerate the array.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/rx/operators/creation.rb', line 131 def of_enumerator(enum, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule_recursive lambda {|this| has_value = false value = nil begin value = enum.next has_value = true rescue StopIteration => _ observer.on_completed rescue => e observer.on_error e end if has_value observer.on_next value this.call end } end end |
.on_error_resume_next(*args) ⇒ Object
Concatenates all of the specified observable sequences, even if the previous observable sequence terminated exceptionally.
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 |
# File 'lib/rx/operators/multiple.rb', line 622 def on_error_resume_next(*args) AnonymousObservable.new do |observer| gate = AsyncLock.new disposed = false e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum subscription = SerialSubscription.new cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this| gate.wait do current = nil has_next = false err = nil if !disposed begin current = e.next has_next = true rescue StopIteration => _ # Do nothing rescue => e err = e end else break end if err observer.on_error err break end unless has_next observer.on_completed break end d = SingleAssignmentSubscription.new subscription.subscription = d new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error {|_| this.call } o.on_completed { this.call } end d.subscription = current.subscribe new_obs end } CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }] end end |
.pairs(obj, scheduler = CurrentThreadScheduler.instance) ⇒ Object
3 4 5 |
# File 'lib/rx/linq/observable/pairs.rb', line 3 def pairs(obj, scheduler = CurrentThreadScheduler.instance) of_enumerable(obj, scheduler) end |
.raise_error(error, scheduler = ImmediateScheduler.instance) ⇒ Object
Returns an observable sequence that terminates with an exception.
155 156 157 158 159 160 161 |
# File 'lib/rx/operators/creation.rb', line 155 def raise_error(error, scheduler = ImmediateScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule lambda { observer.on_error error } end end |
.range(start, count, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence of integral numbers within a specified range.
164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/rx/operators/creation.rb', line 164 def range(start, count, scheduler = CurrentThreadScheduler.instance) AnonymousObservable.new do |observer| scheduler.schedule_recursive_with_state 0, lambda {|i, this| if i < count observer.on_next (start + i) this.call(i + 1) else observer.on_completed end } end end |
.repeat(value, count, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence that repeats the given element the specified number of times.
183 184 185 |
# File 'lib/rx/operators/creation.rb', line 183 def repeat(value, count, scheduler = CurrentThreadScheduler.instance) Observable.just(value, scheduler).repeat(count) end |
.repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Generates an observable sequence that repeats the given element infinitely.
178 179 180 |
# File 'lib/rx/operators/creation.rb', line 178 def repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance) Observable.just(value, scheduler).repeat_infinitely end |
.rescue_error(*args) ⇒ Object
Continues an observable sequence that is terminated by an exception with the next observable sequence.
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 |
# File 'lib/rx/operators/multiple.rb', line 431 def rescue_error(*args) AnonymousObservable.new do |observer| gate = AsyncLock.new disposed = false e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum subscription = SerialSubscription.new last_error = nil cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this| gate.wait do current = nil has_next = false error = nil if disposed break else begin current = e.next has_next = true rescue StopIteration => _ # Do nothing rescue => e error = e end end if error observer.on_error error break end unless has_next if last_error observer.on_error last_error else observer.on_completed end break end new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error do |err| last_error = err this.call end o.on_completed(&observer.method(:on_completed)) end d = SingleAssignmentSubscription.new subscription.subscription = d d.subscription = current.subscribe new_obs end } CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }] end end |
.start(func, context, scheduler = DefaultScheduler.instance) ⇒ Object
3 4 5 |
# File 'lib/rx/linq/observable/start.rb', line 3 def start(func, context, scheduler = DefaultScheduler.instance) Observable.to_async(func, context, scheduler).call end |
.timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/rx/linq/observable/timer.rb', line 3 def timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance) case period_or_scheduler when Numeric period = period_or_scheduler when Scheduler scheduler = period_or_scheduler end if Time === due_time if period.nil? observable_timer_date(due_time, scheduler) else observable_timer_date_and_period(due_time, period, scheduler) end else if period.nil? observable_timer_time_span(due_time, scheduler) else observable_timer_time_span_and_period(due_time, period, scheduler) end end end |
.to_async(func, context = nil, scheduler = DefaultScheduler.instance) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rx/linq/observable/to_async.rb', line 3 def to_async(func, context = nil, scheduler = DefaultScheduler.instance) lambda() {|*args| subject = AsyncSubject.new scheduler.schedule lambda { begin if context result = proc_bind(func, context).call(*args) else result = func.call(*args) end rescue => e subject.on_error e return end subject.on_next result subject.on_completed } return subject.as_observable } end |
.using(resource_factory, observable_factory) ⇒ Object
Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence’s lifetime.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/rx/operators/creation.rb', line 188 def using(resource_factory, observable_factory) AnonymousObservable.new do |observer| source = nil subscription = Subscription.empty begin resource = resource_factory.call subscription = resource unless resource.nil? source = observable_factory.call resource rescue => e next CompositeSubscription.new [self.raise_error(e).subscribe(observer), subscription] end CompositeSubscription.new [source.subscribe(observer), subscription] end end |
.when(*plans) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/rx/linq/observable/when.rb', line 3 def when(*plans) AnonymousObservable.new do |observer| active_plans = [] external_subscriptions = {} out_observer = Observer.configure {|o| o.on_next(&observer.method(:on_next)) o.on_error {|err| external_subscriptions.each {|_, v| v.on_error err } } o.on_completed(&observer.method(:on_completed)) } begin plans.each {|x| active_plans.push x.activate(external_subscriptions, out_observer, lambda {|active_plan| active_plans.delete(active_plan) active_plans.length == 0 && observer.on_completed }) } rescue => e Observable.raise_error(e).subscribe(observer) end group = CompositeSubscription.new external_subscriptions.each {|_, join_observer| join_observer.subscribe group.push join_observer } group end end |
.while(condition, source) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rx/linq/observable/while.rb', line 3 def while(condition, source) enum = Enumerator.new {|y| while condition.call y << source end } scheduler = ImmediateScheduler.instance is_disposed = false subscription = SerialSubscription.new AnonymousObservable.new do |observer| cancelable = scheduler.schedule_recursive lambda {|this| return if is_disposed begin current_value = enum.next rescue StopIteration => e observer.on_completed return rescue => e observer.on_error e return end d = SingleAssignmentSubscription.new subscription.subscription = d d.subscription = current_value.subscribe( observer.method(:on_next), observer.method(:on_error), lambda { this.call } ) } CompositeSubscription.new [subscription, cancelable, Subscription.create { is_disposed = true }] end end |
.zip(*args, &result_selector) ⇒ Object
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 |
# File 'lib/rx/operators/multiple.rb', line 675 def zip(*args, &result_selector) AnonymousObservable.new do |observer| result_selector ||= lambda {|*inner_args| inner_args } n = args.length queues = Array.new(n) {|i| Array.new } is_done = Array.new(n, false) next_action = lambda do |i| if queues.all? {|q| q.length > 0 } res = queues.map {|q| q.shift } observer.on_next(result_selector.call(*res)) elsif enumerable_select_with_index(is_done) {|x, j| j != i } .all? observer.on_completed end end done = lambda do |i| is_done[i] = true observer.on_completed if is_done.all? end gate = Monitor.new subscriptions = Array.new(n) do |i| sas = SingleAssignmentSubscription.new sas_obs = Observer.configure do |o| o.on_next do |x| queues[i].push(x) next_action.call i end o.on_error(&observer.method(:on_error)) o.on_completed { done.call i } end sas.subscription = args[i].synchronize(gate).subscribe(sas_obs) sas end subscriptions.push(Subscription.create { queues.each {|q| q = [] }}) CompositeSubscription.new subscriptions end end |
Instance Method Details
#_subscribe(observer) ⇒ Subscription
Subscribes the given observer to the observable sequence.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rx/core/observable.rb', line 37 def _subscribe(observer) auto_detach_observer = AutoDetachObserver.new observer if CurrentThreadScheduler.schedule_required? CurrentThreadScheduler.instance.schedule_with_state auto_detach_observer, method(:schedule_subscribe) else begin auto_detach_observer.subscription = subscribe_core auto_detach_observer rescue => e raise e unless auto_detach_observer.fail e end end auto_detach_observer end |
#add_ref(r) ⇒ Object
3 4 5 6 7 |
# File 'lib/rx/internal/util.rb', line 3 def add_ref(r) AnonymousObservable.new do |observer| CompositeSubscription.new [r.subscription, self.subscribe(observer)] end end |
#all?(&block) ⇒ Rx::Observable
Determines whether all elements of an observable sequence satisfy a condition if block given, else if all are true
64 65 66 67 68 69 |
# File 'lib/rx/operators/aggregates.rb', line 64 def all?(&block) block ||= lambda { |_| true } select {|v| !(block.call v)}. any?. map {|b| !b } end |
#amb(second) ⇒ Object
Propagates the observable sequence that reacts first.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rx/operators/multiple.rb', line 26 def amb(second) AnonymousObservable.new do |observer| left_subscription = SingleAssignmentSubscription.new right_subscription = SingleAssignmentSubscription.new choice = :neither gate = Monitor.new left = AmbObserver.new right = AmbObserver.new handle_left = lambda do |&action| if choice == :neither choice = :left right_subscription.unsubscribe left.observer = observer end action.call if choice == :left end handle_right = lambda do |&action| if choice == :neither choice = :right left_subscription.unsubscribe right.observer = observer end action.call if choice == :right end left_obs = Observer.configure do |o| o.on_next {|x| handle_left.call { observer.on_next x } } o.on_error {|err| handle_left.call { observer.on_error err } } o.on_completed { handle_left.call { observer.on_completed } } end right_obs = Observer.configure do |o| o.on_next {|x| handle_right.call { observer.on_next x } } o.on_error {|err| handle_right.call { observer.on_error err } } o.on_completed { handle_right.call { observer.on_completed } } end left.observer = Observer.allow_reentrancy(left_obs, gate) right.observer = Observer.allow_reentrancy(right_obs, gate) left_subscription.subscription = self.subscribe left right_subscription.subscription = second.subscribe right CompositeSubscription.new [left_subscription, right_subscription] end end |
#and(right) ⇒ Object
3 4 5 |
# File 'lib/rx/linq/observable/and.rb', line 3 def and(right) Pattern.new([self, right]); end |
#any?(&block) ⇒ Rx::Observable
Determines whether any element of an observable sequence satisfies a condition if a block is given else if there are any items in the observable sequence.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/rx/operators/aggregates.rb', line 84 def any?(&block) return map(&block).any? if block_given? AnonymousObservable.new do |observer| new_obs = Observer.configure do |o| o.on_next do |_| observer.on_next true observer.on_completed end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next false observer.on_completed end end subscribe new_obs end end |
#as_observable ⇒ Object
Hides the identity of an observable sequence.
16 17 18 |
# File 'lib/rx/operators/single.rb', line 16 def as_observable AnonymousObservable.new {|observer| subscribe(observer) } end |
#average(&block) ⇒ Rx::Observable
Computes the average of an observable sequence of values that are optionally obtained by invoking a transform function on each element of the input sequence if a block is given
109 110 111 112 113 114 115 116 117 |
# File 'lib/rx/operators/aggregates.rb', line 109 def average(&block) return map(&block).average if block_given? scan({:sum => 0, :count => 0}) {|prev, current| {:sum => prev[:sum] + current, :count => prev[:count] + 1 }}. final. map {|x| raise 'Sequence contains no elements' if x[:count] == 0 x[:sum] / x[:count] } end |
#buffer_with_count(count, skip = count) ⇒ Object
Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
21 22 23 24 25 |
# File 'lib/rx/operators/single.rb', line 21 def buffer_with_count(count, skip = count) raise ArgumentError.new 'Count must be greater than zero' if count <= 0 raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0 window_with_count(count, skip).flat_map(&:to_a).find_all {|x| x.length > 0 } end |
#buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) ⇒ Object
Projects each element of an observable sequence into consecutive non-overlapping buffers which are produced based on timing information.
22 23 24 25 26 |
# File 'lib/rx/operators/time.rb', line 22 def buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0 raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0 window_with_time(time_span, time_shift, scheduler).flat_map(&:to_a) end |
#combine_latest(other, &result_selector) ⇒ Object
Merges two observable sequences into one observable sequence by using the selector function whenever one of the observable sequences produces an element.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/rx/operators/multiple.rb', line 117 def combine_latest(other, &result_selector) AnonymousObservable.new do |observer| has_left = false has_right = false left = nil right = nil left_done = false right_done = false left_subscription = SingleAssignmentSubscription.new right_subscription = SingleAssignmentSubscription.new gate = Monitor.new left_obs = Observer.configure do |o| o.on_next do |l| has_left = true left = l if has_right res = nil begin res = result_selector.call left, right rescue => e observer.on_error e break end observer.on_next res end observer.on_completed if right_done end o.on_error(&observer.method(:on_error)) o.on_completed do left_done = true observer.on_completed if right_done end end right_obs = Observer.configure do |o| o.on_next do |r| has_right = true right = r if has_left res = nil begin res = result_selector.call left, right rescue => e observer.on_error e break end observer.on_next res end observer.on_completed if left_done end o.on_error(&observer.method(:on_error)) o.on_completed do right_done = true observer.on_completed if left_done end end left_subscription.subscription = synchronize(gate).subscribe(left_obs) right_subscription.subscription = other.synchronize(gate).subscribe(right_obs) CompositeSubscription.new [left_subscription, right_subscription] end end |
#concat(*other) ⇒ Object
Concatenates the second observable sequence to the first observable sequence upon successful termination of the first.
195 196 197 |
# File 'lib/rx/operators/multiple.rb', line 195 def concat(*other) Observable.concat([self, *other].to_enum) end |
#concat_all ⇒ Object
3 4 5 |
# File 'lib/rx/linq/observable/concat_all.rb', line 3 def concat_all merge_concurrent(1) end |
#concat_map(selector, result_selector = nil) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/rx/linq/observable/concat_map.rb', line 3 def concat_map(selector, result_selector = nil) if Proc === result_selector return concat_map(lambda {|x, i| selector_result = selector.call(x, i) if selector_result.respond_to?(:each) selector_result = Observable.from(selector_result) end selector_result.map_with_index {|y, i2| result_selector.call(x, y, i, i2) } }) end if Proc === selector _concat_map(selector) else _concat_map(lambda {|*_| selector }) end end |
#concat_map_observer(on_next, on_error, on_completed) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rx/linq/observable/concat_map_observer.rb', line 3 def concat_map_observer(on_next, on_error, on_completed) AnonymousObservable.new do |observer| index = 0 subscribe( lambda {|x| begin result = on_next.call(x, index) index += 1 rescue => e observer.on_error e return end observer.on_next result }, lambda {|err| begin result = on_error.call(err) rescue => e observer.on_error e return end observer.on_next result observer.on_completed }, lambda { begin result = on_completed.call rescue => e observer.on_error e return end observer.on_next result observer.on_completed }) end.concat_all end |
#contains(search_element, from_index = 0) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/rx/linq/observable/contains.rb', line 3 def contains(search_element, from_index = 0) AnonymousObservable.new do |observer| i = 0 n = from_index if n < 0 observer.on_next false observer.on_completed return Subscription.empty end subscribe( lambda {|x| if i.tap { i += 1 } >= n && x == search_element observer.on_next true observer.on_completed end }, observer.method(:on_error), lambda { observer.on_next false observer.on_completed }) end end |
#contains?(item) ⇒ Rx::Observable
Determines whether an observable sequence contains a specified element. sequence contains an element that has the specified value.
123 124 125 |
# File 'lib/rx/operators/aggregates.rb', line 123 def contains?(item) select {|x| x.eql? item}.any? end |
#count(&block) ⇒ Object
Returns an observable sequence containing a number that represents how many elements in the specified observable sequence satisfy a condition if the block is given, else the number of items in the observable sequence
130 131 132 133 |
# File 'lib/rx/operators/aggregates.rb', line 130 def count(&block) return select(&block).count if block_given? reduce(0) {|c, _| c + 1 } end |
#debounce(due_time, scheduler = DefaultScheduler.instance) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rx/linq/observable/debounce.rb', line 3 def debounce(due_time, scheduler = DefaultScheduler.instance) AnonymousObservable.new do |observer| cancelable = SerialSubscription.new hasvalue = false value = nil id = 0 subscription = subscribe( lambda {|x| hasvalue = true value = x id += 1 current_id = id d = SingleAssignmentSubscription.new cancelable.subscription = d d.subscription = scheduler.schedule_relative(due_time, lambda { observer.on_next value if hasvalue && id == current_id hasvalue = false }) }, lambda {|e| cancelable.dispose observer.on_error e hasvalue = false id += 1 }, lambda { cancelable.dispose observer.on_next value if hasvalue observer.on_completed hasvalue = false id += 1 }) CompositeSubscription.new [subscription, cancelable] end end |
#default_if_empty(default_value = nil) ⇒ Object
Returns the elements of the specified sequence or the type parameter’s default value in a singleton sequence if the sequence is empty.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rx/operators/standard_query_operators.rb', line 16 def default_if_empty(default_value = nil) AnonymousObservable.new do |observer| found = false new_observer = Observer.configure do |o| o.on_next do |x| found = true observer.on_next x end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next(default_value) unless found observer.on_completed end end subscribe(new_observer) end end |
#delay(due_time, scheduler = DefaultScheduler.instance) ⇒ Object
3 4 5 6 7 8 9 |
# File 'lib/rx/linq/observable/delay.rb', line 3 def delay(due_time, scheduler = DefaultScheduler.instance) if Time === due_time delay_date(due_time, scheduler) else delay_time_span(due_time, scheduler) end end |
#delay_with_selector(subscription_delay, delay_duration_selector = nil) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/rx/linq/observable/delay_with_selector.rb', line 3 def delay_with_selector(subscription_delay, delay_duration_selector = nil) if Proc === subscription_delay selector = subscription_delay else sub_delay = subscription_delay selector = delay_duration_selector end AnonymousObservable.new do |observer| delays = CompositeSubscription.new at_end = false done = lambda { if at_end && delays.length == 0 observer.on_completed end } subscription = SerialSubscription.new start = lambda {|*_| subscription.subscription = subscribe( lambda {|x| begin delay = selector.call(x) rescue => error observer.on_error error return end d = SingleAssignmentSubscription.new delays.push(d) d.subscription = delay.subscribe( lambda {|_| observer.on_next x delays.delete(d) done.call }, observer.method(:on_error), lambda { observer.on_next x delays.delete(d) done.call }) }, observer.method(:on_error), lambda { at_end = true subscription.dispose done.call }) } if !sub_delay start.call else subscription.subscription = sub_delay.subscribe( start, observer.method(:on_error), start) end CompositeSubscription.new [subscription, delays] end end |
#dematerialize ⇒ Object
Dematerializes the explicit notification values of an observable sequence as implicit notifications.
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rx/operators/single.rb', line 28 def dematerialize AnonymousObservable.new do |observer| new_obs = Rx::Observer.configure do |o| o.on_next {|x| x.accept observer } o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end |
#distinct(&key_selector) ⇒ Object
Returns an observable sequence that contains only distinct elements according to the optional key_selector.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/rx/operators/standard_query_operators.rb', line 39 def distinct(&key_selector) key_selector ||= lambda {|x| x} AnonymousObservable.new do |observer| h = Hash.new new_observer = Observer.configure do |o| o.on_next do |x| key = nil has_added = false begin key = key_selector.call x key_s = key.to_s unless h.key? key_s has_added = true h[key_s] = true end rescue => e observer.on_error e next end observer.on_next x if has_added end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end |
#distinct_until_changed(&key_selector) ⇒ Object
Returns an observable sequence that contains only distinct contiguous elements according to the optional key_selector.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/rx/operators/single.rb', line 42 def distinct_until_changed(&key_selector) key_selector ||= lambda {|x| x} AnonymousObservable.new do |observer| current_key = nil has_current = nil new_obs = Rx::Observer.configure do |o| o.on_next do |value| key = nil begin key = key_selector.call value rescue => err observer.on_error err next end if !current_key || key != current_key has_current = true current_key = key observer.on_next value end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end |
#do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rx/linq/observable/do.rb', line 3 def do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil) if block_given? on_next_func = Proc.new elsif Proc === observer_or_on_next on_next_func = observer_or_on_next else on_next_func = observer_or_on_next.method(:on_next) on_error_func = observer_or_on_next.method(:on_error) on_completed_func = observer_or_on_next.method(:on_completed) end AnonymousObservable.new do |observer| subscribe( lambda {|x| begin on_next_func.call x rescue => e observer.on_error e end observer.on_next x }, lambda {|err| begin on_error_func && on_error_func.call(x) rescue => e observer.on_error e end observer.on_error err }, lambda { begin on_completed_func && on_completed_func.call rescue => e observer.on_error e end observer.on_completed }) end end |
#element_at(index) ⇒ Rx::Observable
Returns the element at a specified index in a sequence. source sequence.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/rx/operators/aggregates.rb', line 139 def element_at(index) raise ArgumentError.new 'index cannot be less than zero' if index < 0 AnonymousObservable.new do |observer| i = index new_obs = Observer.configure do |o| o.on_next do |value| if i == 0 observer.on_next value observer.on_completed end i -= 1 end o.on_error(&observer.method(:on_error)) o.on_completed { raise 'Sequence contains no elements' } end subscribe new_obs end end |
#element_at_or_default(index, default_value = nil) ⇒ Object
Returns the element at a specified index in a sequence or a default value if the index is out of range.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/rx/operators/aggregates.rb', line 164 def element_at_or_default(index, default_value = nil) raise ArgumentError.new 'index cannot be less than zero' if index < 0 AnonymousObservable.new do |observer| i = index new_obs = Observer.configure do |o| o.on_next do |value| if i == 0 observer.on_next value observer.on_completed end i -= 1 end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next default_value observer.on_completed end end subscribe new_obs end end |
#empty? ⇒ Rx::Observable
Determines whether an observable sequence is empty. sequence is empty.
242 243 244 |
# File 'lib/rx/operators/aggregates.rb', line 242 def empty? any?.map {|b| !b } end |
#ensures ⇒ Object
Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.
117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/rx/operators/single.rb', line 117 def ensures AnonymousObservable.new do |observer| subscription = subscribe observer Subscription.create do begin subscription.unsubscribe ensure yield end end end end |
#enumerator_repeat_infinitely(value) ⇒ Object
390 391 392 393 394 395 396 |
# File 'lib/rx/operators/single.rb', line 390 def enumerator_repeat_infinitely(value) Enumerator.new do |y| while true y << value end end end |
#enumerator_repeat_times(num, value) ⇒ Object
382 383 384 385 386 387 388 |
# File 'lib/rx/operators/single.rb', line 382 def enumerator_repeat_times(num, value) Enumerator.new do |y| num.times do |i| y << value end end end |
#final ⇒ Rx::Observable
Internal method to get the final value
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rx/operators/aggregates.rb', line 16 def final AnonymousObservable.new do |observer| value = nil has_value = false new_obs = Observer.configure do |o| o.on_next do |x| value = x has_value = true end o.on_error(&observer.method(:on_error)) o.on_completed do if has_value observer.on_next value observer.on_completed else observer.on_error(RuntimeError.new 'Sequence contains no elements') end end end subscribe new_obs end end |
#first(&block) ⇒ Rx::Observable
Returns the first element of an observable sequence that satisfies the condition in the predicate if a block is given, else the first item in the observable sequence. condition in the predicate if a block is given, else the first element.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/rx/operators/aggregates.rb', line 195 def first(&block) return select(&block).first if block_given? AnonymousObservable.new do |observer| new_obs = Observer.configure do |o| o.on_next do |x| observer.on_next x observer.on_completed end o.on_error(&observer.method(:on_error)) o.on_completed { raise 'Sequence contains no elements' } end subscribe new_obs end end |
#first_or_default(default_value = nil, &block) ⇒ Rx::Observable
Returns the first element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. condition in the predicate if given, or a default value if no such element exists.
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/rx/operators/aggregates.rb', line 218 def first_or_default(default_value = nil, &block) return select(&block).first_or_default(default_value) if block_given? AnonymousObservable.new do |observer| new_obs = Observer.configure do |o| o.on_next do |x| observer.on_next x observer.on_completed end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next default_value observer.on_completed end end subscribe new_obs end end |
#flat_map(&block) ⇒ Object
Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.
108 109 110 |
# File 'lib/rx/operators/standard_query_operators.rb', line 108 def flat_map(&block) map(&block).merge_all end |
#flat_map_with_index(&block) ⇒ Object
Projects each element of an observable sequence to an observable sequence by incorporating the element’s index and merges the resulting observable sequences into one observable sequence.
113 114 115 |
# File 'lib/rx/operators/standard_query_operators.rb', line 113 def flat_map_with_index(&block) map_with_index(&block).merge_all end |
#group_join(right, left_duration_selector, right_duration_selector, result_selector) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/rx/linq/observable/group_join.rb', line 3 def group_join(right, left_duration_selector, right_duration_selector, result_selector) AnonymousObservable.new do |observer| group = CompositeSubscription.new r = RefCountSubscription.new(group) left_map = {} right_map = {} left_id = 0 right_id = 0 left_obs = Observer.configure do |o| o.on_next {|value| s = Subject.new id = left_id left_id += 1 left_map[id] = s begin result = result_selector.call(value, s.add_ref(r)) rescue => err left_map.values.each {|v| v.on_error(err) } observer.on_error(err) next end observer.on_next(result) right_map.values.each {|v| s.on_next(v) } md = SingleAssignmentSubscription.new group.push md expire = lambda { if left_map.delete(id) s.on_completed end group.delete(md) } begin duration = left_duration_selector.call(value) rescue => err left_map.values.each {|v| v.on_error(err) } observer.on_error(err) next end md.subscription = duration.take(1).subscribe( lambda {|_| }, lambda {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) }, expire) } o.on_error {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) } o.on_completed(&observer.method(:on_completed)) end group.push self.subscribe(left_obs) right_obs = Observer.configure do |o| o.on_next {|value| id = right_id right_id += 1 right_map[id] = value md = SingleAssignmentSubscription.new group.push md expire = lambda { right_map.delete(id) group.delete(md) } begin duration = right_duration_selector.call(value) rescue => err right_map.values.each {|v| v.on_error(err) } observer.on_error(err) next end md.subscription = duration.take(1).subscribe( lambda {|_| }, lambda {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) }, expire) } o.on_error {|e| left_map.values.each {|v| v.on_error(e) } observer.on_error(e) } end group.push right.subscribe(right_obs) r end end |
#ignore_elements ⇒ Object
Ignores all elements in an observable sequence leaving only the termination messages.
131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/rx/operators/single.rb', line 131 def ignore_elements AnonymousObservable.new do |observer| new_obs = Rx::Observer.configure do |o| o.on_next {|_| } o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end |
#last(&block) ⇒ Rx::Observable
Returns the last element of an observable sequence that satisfies the condition in the predicate if the block is given, else the last element in the observable sequence. condition in the predicate if given, or the last element in the observable sequence.
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/rx/operators/aggregates.rb', line 251 def last(&block) return select(&block).last if block_given? AnonymousObservable.new do |observer| value = nil seen_value = false new_obs = Observer.configure do |o| o.on_next do |v| value = v seen_value = true end o.on_error(&observer.method(:on_error)) o.on_completed do if seen_value observer.on_next value observer.on_completed else observer.on_error(RuntimeError.new 'Sequence contains no elements' ) end end end subscribe new_obs end end |
#last_or_default(default_value = nil, &block) ⇒ Rx::Observable
Returns the last element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. condition in the predicate if given, or a default value if no such element exists.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/rx/operators/aggregates.rb', line 286 def last_or_default(default_value = nil, &block) return select(&block).last_or_default(default_value) if block_given? AnonymousObservable.new do |observer| value = nil seen_value = false new_obs = Observer.configure do |o| o.on_next do |v| value = v seen_value = true end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next (seen_value ? value : default_value) observer.on_completed end end subscribe new_obs end end |
#latest ⇒ Object
Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. Each time a new inner observable sequence is received, unsubscribe from the previous inner observable sequence.
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/rx/operators/multiple.rb', line 347 def latest AnonymousObservable.new do |observer| gate = Monitor.new inner_subscription = SerialSubscription.new stopped = false latest_num = 0 has_latest = false source_obs = Observer.configure do |o| o.on_next do |inner_source| id = 0 gate.synchronize do latest_num += 1 id = latest_num end d = SingleAssignmentSubscription.new inner_obs = Observer.configure do |io| io.on_next {|x| gate.synchronize { observer.on_next x if latest_num == id } } io.on_error do |err| gate.synchronize do has_latest = false observer.on_error err if latest_num == id end end end d.subscription = inner_source.subscribe inner_obs end o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do gate.synchronize do stopped = true observer.on_completed unless has_latest end end end subscription = subscribe source_obs CompositeSubscription.new [subscription, inner_subscription] end end |
#map(&block) ⇒ Object
Projects each element of an observable sequence into a new form.
103 104 105 |
# File 'lib/rx/operators/standard_query_operators.rb', line 103 def map(&block) map_with_index {|x, _| block.call x } end |
#map_with_index(&block) ⇒ Object
Projects each element of an observable sequence into a new form by incorporating the element’s index.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rx/operators/standard_query_operators.rb', line 76 def map_with_index(&block) AnonymousObservable.new do |observer| new_observer = Observer.configure do |o| i = 0 o.on_next do |x| result = nil begin result = block.call(x, i) i += 1 rescue => e observer.on_error e next end observer.on_next result end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end |
#materialize ⇒ Object
Materializes the implicit notifications of an observable sequence as explicit notification values.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/rx/operators/single.rb', line 144 def materialize AnonymousObservable.new do |observer| new_obs = Rx::Observer.configure do |o| o.on_next {|x| observer.on_next(Notification.create_on_next x) } o.on_error do |err| observer.on_next(Notification.create_on_next err) observer.on_completed end o.on_completed do observer.on_next(Notification.create_on_completed) observer.on_completed end end subscribe new_obs end end |
#max(&block) ⇒ Rx::Observable
Returns the maximum element in an observable sequence.
314 315 316 317 |
# File 'lib/rx/operators/aggregates.rb', line 314 def max(&block) return map(&block).max if block_given? max_by {x| x} .map {|x| x[0] } end |
#max_by(&block) ⇒ Rx::Observable
Returns the elements in an observable sequence with the maximum key value. key value.
323 324 325 |
# File 'lib/rx/operators/aggregates.rb', line 323 def max_by(&block) extrema_by(&block) end |
#merge(other, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Merges elements from two observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.
200 201 202 |
# File 'lib/rx/operators/multiple.rb', line 200 def merge(other, scheduler = CurrentThreadScheduler.instance) Observable.merge_all(scheduler, *[self, other]) end |
#merge_all ⇒ Object
Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/rx/operators/multiple.rb', line 265 def merge_all AnonymousObservable.new do |observer| gate = Monitor.new stopped = false m = SingleAssignmentSubscription.new group = CompositeSubscription.new [m] new_obs = Observer.configure do |o| o.on_next do |inner_source| inner_subscription = SingleAssignmentSubscription.new group << inner_subscription inner_obs = Observer.configure do |io| io.on_next {|x| gate.synchronize { observer.on_next x } } io.on_error {|err| gate.synchronize { observer.on_error err } } io.on_completed do group.delete inner_subscription gate.synchronize { observer.on_completed } if stopped && group.length == 1 end end inner_subscription.subscription = inner_source.subscribe inner_obs end o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do stopped = true gate.synchronize { observer.on_completed } if group.length == 1 end end subscribe new_obs end end |
#merge_concurrent(max_concurrent = 1) ⇒ Object
Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/rx/operators/multiple.rb', line 205 def merge_concurrent(max_concurrent = 1) AnonymousObservable.new do |observer| gate = Monitor.new q = [] stopped = false group = CompositeSubscription.new active = 0 subscriber = nil subscriber = lambda do |xs| subscription = SingleAssignmentSubscription.new group << subscription new_obs = Observer.configure do |o| o.on_next {|x| gate.synchronize { observer.on_next x } } o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do group.delete subscription gate.synchronize do if q.length > 0 s = q.shift subscriber.call s else active -= 1 observer.on_completed if stopped && active == 0 end end end end xs.subscribe new_obs end inner_obs = Observer.configure do |o| o.on_next do |inner_source| gate.synchronize do if active < max_concurrent active += 1 subscriber.call inner_source else q << inner_source end end end o.on_error {|err| gate.synchronize { observer.on_error err } } o.on_completed do stopped = true observer.on_completed if active == 0 end end group << subscribe(inner_obs) end end |
#min(&block) ⇒ Rx::Observable
Returns the minimum element in an observable sequence.
330 331 332 333 |
# File 'lib/rx/operators/aggregates.rb', line 330 def min(&block) return map(&block).min if block_given? min_by {|x| x} .map {|x| x[0] } end |
#min_by(&block) ⇒ Rx::Observable
Returns the elements in an observable sequence with the minimum key value. minimum key value.
339 340 341 |
# File 'lib/rx/operators/aggregates.rb', line 339 def min_by(&block) extrema_by(true, &block) end |
#multicast(subject_or_subject_selector, selector = nil) ⇒ Object
3 4 5 6 7 8 9 10 11 12 |
# File 'lib/rx/linq/observable/multicast.rb', line 3 def multicast(subject_or_subject_selector, selector = nil) if Proc === subject_or_subject_selector AnonymousObservable.new do |observer| connectable = self.multicast(subject_or_subject_selector.call) CompositeSubscription.new [selector.call(connectable).subscribe(observer), self] end else ConnectableObservable.new(self, subject_or_subject_selector) end end |
#none?(&block) ⇒ Rx::Observable
Determines whether no elements of an observable sequence satisfy a condition if block given, else if all are false
75 76 77 78 79 |
# File 'lib/rx/operators/aggregates.rb', line 75 def none?(&block) block ||= lambda { |_| true } select {|v| !(block.call v)}. any? end |
#observe_on(scheduler) ⇒ Object
Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
32 33 34 35 36 37 38 |
# File 'lib/rx/operators/synchronization.rb', line 32 def observe_on(scheduler) raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler AnonymousObservable.new do |observer| subscribe(ObserveOnObserver.new scheduler, observer) end end |
#on_error_resume_next(other) ⇒ Object
Concatenates the second observable sequence to the first observable sequence upon successful or exceptional termination of the first.
304 305 306 307 308 |
# File 'lib/rx/operators/multiple.rb', line 304 def on_error_resume_next(other) raise ArgumentError.new 'Other cannot be nil' unless other Observable.on_error_resume_next self, other end |
#pluck(prop) ⇒ Object
3 4 5 |
# File 'lib/rx/linq/observable/pluck.rb', line 3 def pluck(prop) self.map {|x| x[prop]} end |
#publish(&selector) ⇒ Object
3 4 5 6 7 8 9 |
# File 'lib/rx/linq/observable/publish.rb', line 3 def publish(&selector) if block_given? multicast(lambda { Subject.new }, Proc.new) else multicast(Subject.new) end end |
#reduce(*args, &block) ⇒ Rx::Observable Also known as: aggregate
Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value. For aggregation behavior with incremental intermediate results, see Rx::Observable.scan
47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rx/operators/aggregates.rb', line 47 def reduce(*args, &block) # Argument parsing to support: # 1. (seed, Symbol) || (seed, &block) # 2. (Symbol) || (&block) if (args.length == 2 && args[1].is_a?(Symbol)) || (args.length == 1 && block_given?) scan(*args, &block).start_with(args[0]).final elsif (args.length == 1 && args[0].is_a?(Symbol)) || (args.length == 0 && block_given?) scan(*args, &block).final else raise ArgumentError.new 'Invalid arguments' end end |
#repeat(repeat_count) ⇒ Object
Repeats the observable sequence a specified number of times.
171 172 173 |
# File 'lib/rx/operators/single.rb', line 171 def repeat(repeat_count) Observable.concat(enumerator_repeat_times(repeat_count, self)) end |
#repeat_infinitely ⇒ Object
Repeats the observable sequence indefinitely.
166 167 168 |
# File 'lib/rx/operators/single.rb', line 166 def repeat_infinitely Observable.concat(enumerator_repeat_infinitely(self)) end |
#rescue_error(other = nil, &action) ⇒ Object
Continues an observable sequence that is terminated by an exception of the specified type with the observable sequence produced by the handler or continues an observable sequence that is terminated by an exception with the next observable sequence.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/rx/operators/multiple.rb', line 81 def rescue_error(other = nil, &action) return Observable.rescue_error(other) if other && !block_given? raise ArgumentError.new 'Invalid arguments' if other.nil? && !block_given? AnonymousObservable.new do |observer| subscription = SerialSubscription.new d1 = SingleAssignmentSubscription.new subscription.subscription = d1 new_obs = Observer.configure do |o| o.on_next(&observer.method(:on_next)) o.on_error do |err| result = nil begin result = action.call(err) rescue => e observer.on_error(e) next end d = SingleAssignmentSubscription.new subscription.subscription = d d.subscription = result.subscribe observer end o.on_completed(&observer.method(:on_completed)) end d1.subscription = subscribe new_obs subscription end end |
#retry(retry_count) ⇒ Object
Repeats the source observable sequence the specified number of times or until it successfully terminates.
181 182 183 |
# File 'lib/rx/operators/single.rb', line 181 def retry(retry_count) Observable.rescue_error(enumerator_repeat_times(retry_count, self)) end |
#retry_infinitely ⇒ Object
Repeats the source observable sequence until it successfully terminates.
176 177 178 |
# File 'lib/rx/operators/single.rb', line 176 def retry_infinitely Observable.rescue_error(enumerator_repeat_infinitely(self)) end |
#scan(*args, &block) ⇒ Object
Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no intermediate results, see Observable.reduce.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/rx/operators/single.rb', line 188 def scan(*args, &block) has_seed = false seed = nil action = nil # Argument parsing to support: # 1. (seed, Symbol) # 2. (seed, &block) # 3. (Symbol) # 4. (&block) if args.length == 2 && args[1].is_a?(Symbol) seed = args[0] action = args[1].to_proc has_seed = true elsif args.length == 1 && block_given? seed = args[0] has_seed = true action = block elsif args.length == 1 && args[0].is_a?(Symbol) action = args[0].to_proc elsif args.length == 0 && block_given? action = block else raise ArgumentError.new 'Invalid arguments' end AnonymousObservable.new do |observer| has_accumulation = false accumulation = nil has_value = false new_obs = Observer.configure do |o| o.on_next do |x| begin has_value = true unless has_value if has_accumulation accumulation = action.call(accumulation, x) else accumulation = has_seed ? action.call(seed, x) : x has_accumulation = true end rescue => err observer.on_error err break end observer.on_next accumulation end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next seed if !has_value && has_seed observer.on_completed end end subscribe new_obs end end |
#select(&block) ⇒ Object Also known as: find_all
Filters the elements of an observable sequence based on a predicate.
244 245 246 |
# File 'lib/rx/operators/standard_query_operators.rb', line 244 def select(&block) select_with_index {|x, _| block.call x } end |
#select_with_index(&block) ⇒ Object Also known as: find_all_with_index
Filters the elements of an observable sequence based on a predicate by incorporating the element’s index.
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/rx/operators/standard_query_operators.rb', line 250 def select_with_index(&block) AnonymousObservable.new do |observer| i = 0 new_observer = Observer.configure do |o| o.on_next do |x| should_run = false begin should_run = block.call(x, i) i += 1 rescue => e observer.on_error e next end observer.on_next x if should_run end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end |
#sequence_eql?(other) ⇒ Rx::Observable
Determines whether two sequences are equal by comparing the elements pairwise. sequences are of equal length and their corresponding elements are equal.
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/rx/operators/aggregates.rb', line 347 def sequence_eql?(other) AnonymousObservable.new do |observer| gate = Mutex.new left_done = false right_done = false left_queue = [] right_queue = [] obs1 = Observer.configure do |o| o.on_next do |x| gate.synchronize do if right_queue.length > 0 v = right_queue.shift equal = x == v unless equal observer.on_next false observer.on_completed end elsif right_done observer.on_next false observer.on_completed else left_queue.push x end end end o.on_error(&observer.method(:on_error)) o.on_completed do gate.synchronize do left_done = true if left_queue.length == 0 if right_queue.length > 0 observer.on_next false observer.on_completed elsif right_done observer.on_next true observer.on_completed end end end end end subscription1 = subscribe obs1 obs2 = Observer.configure do |o| o.on_next do |x| gate.synchronize do if left_queue.length > 0 v = left_queue.shift equal = x == v unless equal observer.on_next false observer.on_completed end elsif left_done observer.on_next false observer.on_completed else right_queue.push x end end end o.on_error(&observer.method(:on_error)) o.on_completed do gate.synchronize do right_done = true if right_queue.length == 0 if left_queue.length > 0 observer.on_next false observer.on_completed elsif left_done observer.on_next true observer.on_completed end end end end end subscription2 = other.subscribe obs2 CompositeSubscription.new [subscription1, subscription2] end end |
#single(&block) ⇒ Rx::Observable
Returns the only element of an observable sequence, and reports an exception if there is not exactly one element in the observable sequence.
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
# File 'lib/rx/operators/aggregates.rb', line 443 def single(&block) return select(&block).single if block_given? AnonymousObservable.new do |observer| seen_value = false value = nil new_obs = Observer.configure do |o| o.on_next do |x| if seen_value observer.on_error(RuntimeError.new 'More than one element produced') else value = x seen_value = true end end o.on_error(&observer.method(:on_error)) o.on_completed do if seen_value observer.on_next value observer.on_completed else observer.on_error(RuntimeError.new 'Sequence contains no elements') end end end subscribe new_obs end end |
#single_or_default(default_value = nil, &block) ⇒ Rx::Observable
Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method reports an exception if there is more than one element in the observable sequence. if no such element exists.
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 |
# File 'lib/rx/operators/aggregates.rb', line 481 def single_or_default(default_value = nil, &block) return select(&block).single_or_default(default_value) if block_given? AnonymousObservable.new do |observer| seen_value = false value = nil new_obs = Observer.configure do |o| o.on_next do |x| if seen_value observer.on_error(RuntimeError.new 'More than one element produced') else value = x seen_value = true end end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next (seen_value ? value : default_value) observer.on_completed end end subscribe new_obs end end |
#skip(count) ⇒ Object
Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/rx/operators/standard_query_operators.rb', line 118 def skip(count) AnonymousObservable.new do |observer| remaining = count new_observer = Observer.configure do |o| o.on_next do |x| if remaining <= 0 observer.on_next x else remaining -= 1 end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end |
#skip_last(count) ⇒ Object
Bypasses a specified number of elements at the end of an observable sequence.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/rx/operators/single.rb', line 253 def skip_last(count) raise ArgumentError.new 'Count cannot be less than zero' if count < 0 AnonymousObservable.new do |observer| q = [] new_obs = Observer.configure do |o| o.on_next do |x| q.push x observer.on_next(q.shift) if q.length > count end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe new_obs end end |
#skip_until(other) ⇒ Object
Returns the elements from the source observable sequence only after the other observable sequence produces an element.
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/rx/operators/multiple.rb', line 311 def skip_until(other) raise ArgumentError.new 'Other cannot be nil' unless other AnonymousObservable.new do |observer| source_subscription = SingleAssignmentSubscription.new other_subscription = SingleAssignmentSubscription.new open = false gate = Monitor.new source_obs = Observer.configure do |o| o.on_next {|x| observer.on_next x if open } o.on_error(&observer.method(:on_error)) o.on_completed { observer.on_completed if open } end other_obs = Observer.configure do |o| o.on_next do |_| open = true other_subscription.unsubscribe end o.on_error(&observer.method(:on_error)) end source_subscription.subscription = synchronize(gate).subscribe(source_obs) other_subscription.subscription = other.synchronize(gate).subscribe(other_obs) CompositeSubscription.new [source_subscription, other_subscription] end end |
#skip_while(&block) ⇒ Object
Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
142 143 144 |
# File 'lib/rx/operators/standard_query_operators.rb', line 142 def skip_while(&block) skip_while_with_index {|x, _| block.call x } end |
#skip_while_with_index(&block) ⇒ Object
Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. The element’s index is used in the logic of the predicate function.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/rx/operators/standard_query_operators.rb', line 148 def skip_while_with_index(&block) AnonymousObservable.new do |observer| running = false i = 0 new_observer = Observer.configure do |o| o.on_next do |x| unless running begin running = !block.call(x, i) i += 1 rescue => e observer.on_error e next end observer.on_next x if running end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end |
#start_with(*args) ⇒ Object
Prepends a sequence of values to an observable sequence.
273 274 275 276 277 278 279 |
# File 'lib/rx/operators/single.rb', line 273 def start_with(*args) scheduler = CurrentThreadScheduler.instance if args.size > 0 && Scheduler === args[0] scheduler = args.shift end Observable.from_array(args, scheduler).concat(self) end |
#subscribe(*args) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/rx/core/observable.rb', line 13 def subscribe(*args) case args.size when 0 if block_given? _subscribe Observer.configure {|o| o.on_next(&Proc.new) } else _subscribe Observer.configure end when 1 _subscribe args[0] when 3 _subscribe Observer.configure {|o| o.on_next(&args[0]) o.on_error(&args[1]) o.on_completed(&args[2]) } else raise ArgumentError, "wrong number of arguments (#{args.size} for 0..1 or 3)" end end |
#subscribe_on(scheduler) ⇒ Object
Wraps the source sequence in order to run its subscription and unsubscribe logic on the specified scheduler.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/rx/operators/synchronization.rb', line 15 def subscribe_on(scheduler) raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler AnonymousObservable.new do |observer| m = SingleAssignmentSubscription.new d = SerialSubscription.new d.subscription = m m.subscription = scheduler.schedule lambda { d.subscription = ScheduledSubscription.new scheduler, (subscribe observer) } d end end |
#subscribe_on_completed(&block) ⇒ Object
Subscribes the given block to the on_completed action of the observable sequence.
69 70 71 72 |
# File 'lib/rx/core/observable.rb', line 69 def subscribe_on_completed(&block) raise ArgumentError.new 'Block is required' unless block_given? subscribe(Observer.configure {|o| o.on_completed(&block) }) end |
#subscribe_on_error(&block) ⇒ Object
Subscribes the given block to the on_error action of the observable sequence.
63 64 65 66 |
# File 'lib/rx/core/observable.rb', line 63 def subscribe_on_error(&block) raise ArgumentError.new 'Block is required' unless block_given? subscribe(Observer.configure {|o| o.on_error(&block) }) end |
#subscribe_on_next(&block) ⇒ Subscription
Subscribes the given block to the on_next action of the observable sequence.
57 58 59 60 |
# File 'lib/rx/core/observable.rb', line 57 def subscribe_on_next(&block) raise ArgumentError.new 'Block is required' unless block_given? subscribe(Observer.configure {|o| o.on_next(&block) }) end |
#sum(&block) ⇒ Rx::Observable
Computes the sum of a sequence of values. source sequence.
513 514 515 516 |
# File 'lib/rx/operators/aggregates.rb', line 513 def sum(&block) return map(&block).sum if block_given? reduce(0) {|acc, x| acc + x} end |
#synchronize(gate = Monitor.new) ⇒ Object
Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
41 42 43 44 45 |
# File 'lib/rx/operators/synchronization.rb', line 41 def synchronize(gate = Monitor.new) AnonymousObservable.new do |observer| subscribe(Observer.allow_reentrancy observer, gate) end end |
#take(count, scheduler = ImmediateScheduler.instance) ⇒ Object
Returns a specified number of contiguous elements from the start of an observable sequence.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/rx/operators/standard_query_operators.rb', line 178 def take(count, scheduler = ImmediateScheduler.instance) return Observable.empty(scheduler) if count == 0 AnonymousObservable.new do |observer| remaining = count new_observer = Observer.configure do |o| o.on_next do |x| if remaining > 0 remaining -= 1 observer.on_next x observer.on_completed if remaining == 0 end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end |
#take_last(count, scheduler = CurrentThreadScheduler.instance) ⇒ Object
Returns a specified number of contiguous elements from the end of an observable sequence.
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/rx/operators/single.rb', line 282 def take_last(count, scheduler = CurrentThreadScheduler.instance) raise ArgumentError.new 'Count cannot be less than zero' if count < 0 AnonymousObservable.new do |observer| q = [] g = CompositeSubscription.new new_obs = Observer.configure do |o| o.on_next do |x| q.push x q.shift if q.length > 0 end o.on_error(&observer.method(:on_error)) o.on_completed do g.push(scheduler.schedule_recursive lambda {|this| if q.length > 0 observer.on_next(q.shift) this.call else observer.on_completed end }) end g.add(subscribe new_obs) g end end end |
#take_last_buffer(count) ⇒ Object
Returns a list with the specified number of contiguous elements from the end of an observable sequence.
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/rx/operators/single.rb', line 314 def take_last_buffer(count) AnonymousObservable.new do |observer| q = [] new_obs = Observer.configure do |o| o.on_next do |x| q.push x q.shift if q.length > count end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next q observer.on_completed end end susbcribe new_obs end end |
#take_until(other) ⇒ Object
Returns the elements from the source observable sequence until the other observable sequence produces an element.
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/rx/operators/multiple.rb', line 396 def take_until(other) raise ArgumentError.new 'other cannot be nil' unless other AnonymousObservable.new do |observer| source_subscription = SingleAssignmentSubscription.new other_subscription = SingleAssignmentSubscription.new gate = Monitor.new other_obs = Observer.configure do |o| o.on_next {|_| observer.on_completed } o.on_error(&observer.method(:on_error)) end other_subscription.subscription = other.synchronize(gate).subscribe(other_obs) source_subscription.subscription = synchronize(gate).ensures(&other_subscription.method(:unsubscribe)).subscribe(observer) CompositeSubscription.new [source_subscription, other_subscription] end end |
#take_while(&block) ⇒ Object
Returns elements from an observable sequence as long as a specified condition is true.
204 205 206 |
# File 'lib/rx/operators/standard_query_operators.rb', line 204 def take_while(&block) take_while_with_index {|x, _| block.call x } end |
#take_while_with_index(&block) ⇒ Object
Returns elements from an observable sequence as long as a specified condition is true. The element’s index is used in the logic of the predicate function.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/rx/operators/standard_query_operators.rb', line 210 def take_while_with_index(&block) AnonymousObservable.new do |observer| running = true i = 0 new_observer = Observer.configure do |o| o.on_next do |x| if running begin running = block.call(x, i) i += 1 rescue => e observer.on_error e next end if running observer.on_next x else observer.on_completed end end end o.on_error(&observer.method(:on_error)) o.on_completed(&observer.method(:on_completed)) end subscribe(new_observer) end end |
#tap(observer) ⇒ Object
Invokes the observer’s methods for each message in the source sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/rx/operators/single.rb', line 75 def tap(observer) raise ArgumentError.new 'Observer cannot be nil' unless observer AnonymousObservable.new do |obs| new_obs = Rx::Observer.configure do |o| o.on_next do |value| begin observer.on_next value rescue => err obs.on_error err end obs.on_next value end o.on_error do |err| begin observer.on_error err rescue => e obs.on_error e end obs.on_error err end o.on_completed do begin observer.on_completed rescue => err obs.on_error err end obs.on_completed end end subscribe new_obs end end |
#time_interval(scheduler = DefaultScheduler.instance) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 |
# File 'lib/rx/linq/observable/time_interval.rb', line 3 def time_interval(scheduler = DefaultScheduler.instance) Observable.defer { last = scheduler.now self.map {|x| now = scheduler.now span = now - last last = now TimeInterval.new(span, x) } } end |
#timestamp(scheduler = DefaultScheduler.instance) ⇒ Object
3 4 5 6 7 |
# File 'lib/rx/linq/observable/timestamp.rb', line 3 def (scheduler = DefaultScheduler.instance) map do |x| { value: x, timestamp: scheduler.now } end end |
#to_a ⇒ Rx::Observable
Creates an array from an observable sequence.
520 521 522 523 524 525 526 527 528 529 530 531 |
# File 'lib/rx/operators/aggregates.rb', line 520 def to_a AnonymousObservable.new do |observer| arr = [] self.subscribe( arr.method(:push), observer.method(:on_error), lambda { observer.on_next arr observer.on_completed }) end end |
#to_h {|h| ... } ⇒ Rx::Observable
Creates a Hash from the observable collection. Note that any duplicate keys will be overwritten.
554 555 556 557 558 559 560 561 |
# File 'lib/rx/operators/aggregates.rb', line 554 def to_h h = HashConfiguration.new yield h if block_given? reduce(Hash.new) do |acc, x| acc[h.key_selector_block.call x] = h.value_selector_block.call x acc end end |
#window_with_count(count, skip) ⇒ Object
Projects each element of an observable sequence into zero or more windows which are produced based on element count information.
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/rx/operators/single.rb', line 336 def window_with_count(count, skip) raise ArgumentError.new 'Count must be greater than zero' if count <= 0 raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0 AnonymousObservable.new do |observer| q = [] n = 0 m = SingleAssignmentSubscription.new ref_count_disposable = RefCountSubscription.new m create_window = lambda { s = Subject.new q.push s observer.on_next(s.add_ref(ref_count_disposable)) } create_window.call new_obs = Observer.configure do |o| o.on_next do |x| q.each {|s| s.on_next x} c = n - count + 1 q.shift.on_completed if c >=0 && c % skip == 0 n += 1 create_window.call if n % skip == 0 end o.on_error do |err| q.shift.on_error err while q.length > 0 observer.on_error err end o.on_completed do q.shift.on_completed while q.length > 0 observer.on_completed end end m.subscription = subscribe new_obs ref_count_disposable end end |
#window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) ⇒ Object
Projects each element of an observable sequence into consecutive non-overlapping windows which are produced based on timing information.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/rx/operators/time.rb', line 30 def window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0 raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0 AnonymousObservable.new do |observer| total_time = 0 next_shift = time_shift next_span = time_span gate = Mutex.new q = [] timer_d = SerialSubscription.new group_subscription = CompositeSubscription.new [timer_d] ref_count_subscription = RefCountSubscription.new(group_subscription) create_timer = lambda { m = SingleAssignmentSubscription.new timer_d.subscription = m is_span = false is_shift = false if next_span == next_shift is_span = true is_shift = true elsif next_span < next_shift is_span = true else is_shift = true end new_total_time = is_span ? next_span : next_shift ts = new_total_time - total_time total_time = new_total_time if is_span next_span += time_shift end if is_shift next_shift += time_shift end m.subscription = scheduler.schedule_relative(ts, lambda { gate.synchronize do if is_shift s = Subject.new q.push s observer.on_next(s.add_ref(ref_count_subscription)) end if is_span s = q.shift s.on_completed end create_timer.call end }) } q.push(Subject.new) observer.on_next(q[0].add_ref(ref_count_subscription)) create_timer.call new_obs = Observer.configure do |o| o.on_next do |x| gate.synchronize do q.each {|s| s.on_next x} end end o.on_error do |err| gate.synchronize do q.each {|s| s.on_error err} observer.on_error err end end o.on_completed do gate.synchronize do q.each {|s| s.on_on_completed} observer.on_completed end end end group_subscription.push subscribe(new_obs) ref_count_subscription end end |
#zip(*args, &result_selector) ⇒ Object
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.
418 419 420 421 |
# File 'lib/rx/operators/multiple.rb', line 418 def zip(*args, &result_selector) args.unshift(self) Observable.zip(*args, &result_selector) end |