Method: RxRuby::Observable#scan
- Defined in:
- lib/rx_ruby/operators/single.rb
#scan(*args, &block) ⇒ Object
Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no intermediate results, see Observable.reduce.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/rx_ruby/operators/single.rb', line 188 def scan(*args, &block) has_seed = false seed = nil action = nil # Argument parsing to support: # 1. (seed, Symbol) # 2. (seed, &block) # 3. (Symbol) # 4. (&block) if args.length == 2 && args[1].is_a?(Symbol) seed = args[0] action = args[1].to_proc has_seed = true elsif args.length == 1 && block_given? seed = args[0] has_seed = true action = block elsif args.length == 1 && args[0].is_a?(Symbol) action = args[0].to_proc elsif args.length == 0 && block_given? action = block else raise ArgumentError.new 'Invalid arguments' end AnonymousObservable.new do |observer| has_accumulation = false accumulation = nil has_value = false new_obs = Observer.configure do |o| o.on_next do |x| begin has_value = true unless has_value if has_accumulation accumulation = action.call(accumulation, x) else accumulation = has_seed ? action.call(seed, x) : x has_accumulation = true end rescue => err observer.on_error err return end observer.on_next accumulation end o.on_error(&observer.method(:on_error)) o.on_completed do observer.on_next seed if !has_value && has_seed observer.on_completed end end subscribe new_obs end end |