Class: Reacto::Trackable

Inherits:
Object
  • Object
show all
Defined in:
lib/reacto/trackable.rb

Constant Summary collapse

TOPICS =
[:open, :value, :error, :close]

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(behaviour = NO_ACTION, executor = nil, &block) ⇒ Trackable

Returns a new instance of Trackable.



168
169
170
171
# File 'lib/reacto/trackable.rb', line 168

def initialize(behaviour = NO_ACTION, executor = nil, &block)
  @behaviour = block_given? ? block : behaviour
  @executor = executor
end

Class Method Details

.close(executor = nil) ⇒ Object



36
37
38
39
40
# File 'lib/reacto/trackable.rb', line 36

def close(executor = nil)
  make(nil, executor) do |subscriber|
    subscriber.on_close
  end
end

.combine(*trackables, &block) ⇒ Object



20
21
22
23
24
# File 'lib/reacto/trackable.rb', line 20

def combine(*trackables, &block)
  combine_create(
    Subscriptions::CombiningSubscription, *trackables, &block
  )
end

.combine_last(*trackables, &block) ⇒ Object



26
27
28
29
30
# File 'lib/reacto/trackable.rb', line 26

def combine_last(*trackables, &block)
  combine_create(
    Subscriptions::CombiningLastSubscription, *trackables, &block
  )
end

.combine_with(function, *trackables) ⇒ Object



153
154
# File 'lib/reacto/trackable.rb', line 153

def combine_with(function, *trackables)
end

.enumerable(enumerable, executor = nil) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/reacto/trackable.rb', line 138

def enumerable(enumerable, executor = nil)
  make(nil, executor) do |tracker|
    begin
      enumerable.each do |val|
        break unless tracker.subscribed?
        tracker.on_value(val)
      end

      tracker.on_close if tracker.subscribed?
    rescue => error
      tracker.on_error(error) if tracker.subscribed?
    end
  end
end

.error(err, executor = nil) ⇒ Object



42
43
44
45
46
# File 'lib/reacto/trackable.rb', line 42

def error(err, executor = nil)
  make(nil, executor) do |subscriber|
    subscriber.on_error(err)
  end
end

.interval(interval, enumerator = Behaviours.integers_enumerator, executor: nil) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/reacto/trackable.rb', line 68

def interval(
  interval,
  enumerator = Behaviours.integers_enumerator,
  executor: nil
)
  if executor.is_a?(Concurrent::ImmediateExecutor)
    make do |tracker|
      Behaviours.with_close_and_error(tracker) do |subscriber|
        while subscriber.subscribed?
          sleep interval if subscriber.subscribed?
          if subscriber.subscribed?
            begin
              subscriber.on_value(enumerator.next)
            rescue StopIteration
              break
            end
          else
            break
          end
        end
      end
    end
  else
    make do |tracker|
      queue = Queue.new
      task = Concurrent::TimerTask.new(execution_interval: interval) do
        queue.push('ready')
      end

      Thread::abort_on_exception = true
      thread = Thread.new do
        begin
          loop do
            queue.pop
            break unless tracker.subscribed?

            begin
              value = enumerator.next
              tracker.on_value(value)
            rescue StopIteration
              tracker.on_close if tracker.subscribed?
              break
            rescue StandardError => error
              tracker.on_error(error) if tracker.subscribed?
              break
            end
          end
        ensure
          task.shutdown
        end
      end
      task.execute

      tracker.add_resource(Reacto::Resources::ExecutorResource.new(
        task, threads: [thread]
      ))
    end
  end
end

.later(secs, value, executor: Reacto::Executors.tasks) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/reacto/trackable.rb', line 53

def later(secs, value, executor: Reacto::Executors.tasks)
  if executor.is_a?(Concurrent::ImmediateExecutor)
    make do |tracker|
      sleep secs
      Behaviours.single_tracker_value(tracker, value)
    end
  else
    make do |tracker|
      Concurrent::ScheduledTask.execute(secs, executor: executor) do
        Behaviours.single_tracker_value(tracker, value)
      end
    end
  end
end

.make(behaviour = NO_ACTION, executor = nil, &block) ⇒ Object



48
49
50
51
# File 'lib/reacto/trackable.rb', line 48

def make(behaviour = NO_ACTION, executor = nil, &block)
  behaviour = block_given? ? block : behaviour
  self.new(behaviour, executor)
end

.neverObject



16
17
18
# File 'lib/reacto/trackable.rb', line 16

def never
  self.new
end

.repeat(array, int: 0.1, executor: nil) ⇒ Object



128
129
130
131
132
# File 'lib/reacto/trackable.rb', line 128

def repeat(array, int: 0.1, executor: nil)
  interval(
    int, Behaviours.array_repeat_enumerator(array), executor: executor
  )
end

.value(value, executor = nil) ⇒ Object



134
135
136
# File 'lib/reacto/trackable.rb', line 134

def value(value, executor = nil)
  make(Behaviours.single_value(value), executor)
end

.zip(*trackables, &block) ⇒ Object



32
33
34
# File 'lib/reacto/trackable.rb', line 32

def zip(*trackables, &block)
  combine_create(Subscriptions::ZippingSubscription, *trackables, &block)
end

Instance Method Details

#[](x) ⇒ Object



257
258
259
# File 'lib/reacto/trackable.rb', line 257

def [](x)
  lift(Operations::Drop.new(x, 1))
end

#await(subscription, timeout = nil) ⇒ Object



302
303
304
305
306
# File 'lib/reacto/trackable.rb', line 302

def await(subscription, timeout = nil)
  latch = Concurrent::CountDownLatch.new(1)
  subscription.add(Subscriptions.on_close_and_error { latch.count_down })
  latch.wait(timeout)
end

#buffer(count: nil, delay: nil) ⇒ Object



277
278
279
# File 'lib/reacto/trackable.rb', line 277

def buffer(count: nil, delay: nil)
  lift(Operations::Buffer.new(count: count, delay: delay))
end

#cache(type: :memory, **settings) ⇒ Object



289
290
291
292
# File 'lib/reacto/trackable.rb', line 289

def cache(type: :memory, **settings)
  settings ||= {}
  lift(Operations::Cache.new(type: type, **settings))
end

#concat(trackable) ⇒ Object



269
270
271
# File 'lib/reacto/trackable.rb', line 269

def concat(trackable)
  lift(Operations::Concat.new(trackable))
end

#delay(delay) ⇒ Object



281
282
283
# File 'lib/reacto/trackable.rb', line 281

def delay(delay)
  buffer(delay: delay)
end

#diff(initial = NO_VALUE, fn = Operations::Diff::DEFAULT_FN, &block) ⇒ Object



229
230
231
# File 'lib/reacto/trackable.rb', line 229

def diff(initial = NO_VALUE, fn = Operations::Diff::DEFAULT_FN, &block)
  lift(Operations::Diff.new(block_given? ? block : fn, initial))
end

#do_track(subscription) ⇒ Object



311
312
313
314
315
316
317
# File 'lib/reacto/trackable.rb', line 311

def do_track(subscription)
  if @executor
    @executor.post(subscription, &@behaviour)
  else
    @behaviour.call(subscription)
  end
end

#drop(how_many_to_drop) ⇒ Object Also known as: skip



233
234
235
# File 'lib/reacto/trackable.rb', line 233

def drop(how_many_to_drop)
  lift(Operations::Drop.new(how_many_to_drop))
end

#drop_errorsObject Also known as: skip_errors



237
238
239
# File 'lib/reacto/trackable.rb', line 237

def drop_errors
  lift(Operations::DropErrors.new)
end

#execute_on(executor) ⇒ Object



298
299
300
# File 'lib/reacto/trackable.rb', line 298

def execute_on(executor)
  Trackable.new(@behaviour, executor)
end

#firstObject



253
254
255
# File 'lib/reacto/trackable.rb', line 253

def first
  take(1)
end

#flat_map(transform = nil, &block) ⇒ Object



207
208
209
# File 'lib/reacto/trackable.rb', line 207

def flat_map(transform = nil, &block)
  lift(Operations::FlatMap.new(block_given? ? block : transform))
end

#flat_map_latest(transform = nil, &block) ⇒ Object



211
212
213
# File 'lib/reacto/trackable.rb', line 211

def flat_map_latest(transform = nil, &block)
  lift(Operations::FlatMapLatest.new(block_given? ? block : transform))
end

#flattenObject



249
250
251
# File 'lib/reacto/trackable.rb', line 249

def flatten
  lift(Operations::Flatten.new)
end

#inject(initial = NO_VALUE, injector = nil, &block) ⇒ Object



225
226
227
# File 'lib/reacto/trackable.rb', line 225

def inject(initial = NO_VALUE, injector = nil, &block)
  lift(Operations::Inject.new(block_given? ? block : injector, initial))
end

#lastObject



261
262
263
# File 'lib/reacto/trackable.rb', line 261

def last
  lift(Operations::Last.new)
end

#lift(operation = nil, &block) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
# File 'lib/reacto/trackable.rb', line 195

def lift(operation = nil, &block)
  operation = block_given? ? block : operation
  Trackable.new(nil, @executor) do |tracker_subscription|
    begin
      modified = operation.call(tracker_subscription)
      lift_behaviour(modified) unless modified == NOTHING
    rescue Exception => e
      tracker_subscription.on_error(e)
    end
  end
end

#map(mapping = nil, error: nil, close: nil, &block) ⇒ Object



215
216
217
218
219
# File 'lib/reacto/trackable.rb', line 215

def map(mapping = nil, error: nil, close: nil, &block)
  lift(Operations::Map.new(
    block_given? ? block : mapping, error: error, close: close
  ))
end

#merge(trackable, delay_error: false) ⇒ Object



273
274
275
# File 'lib/reacto/trackable.rb', line 273

def merge(trackable, delay_error: false)
  lift(Operations::Merge.new(trackable, delay_error: delay_error))
end

#off(notification_tracker) ⇒ Object



182
183
184
# File 'lib/reacto/trackable.rb', line 182

def off(notification_tracker)
  # Clean-up logic
end

#on(trackers = {}) ⇒ Object



173
174
175
176
177
178
179
180
# File 'lib/reacto/trackable.rb', line 173

def on(trackers = {})
  unless (trackers.keys - TOPICS).empty?
    raise "This Trackable supports only #{TOPICS}, " \
      "but #{trackers.keys} were passed."
  end

  track(Tracker.new(trackers))
end

#prepend(enumerable) ⇒ Object



265
266
267
# File 'lib/reacto/trackable.rb', line 265

def prepend(enumerable)
  lift(Operations::Prepend.new(enumerable))
end

#select(filter = nil, &block) ⇒ Object



221
222
223
# File 'lib/reacto/trackable.rb', line 221

def select(filter = nil, &block)
  lift(Operations::Select.new(block_given? ? block : filter))
end

#take(how_many_to_take) ⇒ Object



241
242
243
# File 'lib/reacto/trackable.rb', line 241

def take(how_many_to_take)
  lift(Operations::Take.new(how_many_to_take))
end

#throttle(delay) ⇒ Object



285
286
287
# File 'lib/reacto/trackable.rb', line 285

def throttle(delay)
  lift(Operations::Throttle.new(delay))
end

#track(notification_tracker) ⇒ Object



186
187
188
189
190
191
192
193
# File 'lib/reacto/trackable.rb', line 186

def track(notification_tracker)
  subscription =
    Subscriptions::TrackerSubscription.new(notification_tracker, self)

  do_track(subscription)

  Subscriptions::SubscriptionWrapper.new(subscription)
end

#track_on(executor) ⇒ Object



294
295
296
# File 'lib/reacto/trackable.rb', line 294

def track_on(executor)
  lift(Operations::TrackOn.new(executor))
end

#uniqObject



245
246
247
# File 'lib/reacto/trackable.rb', line 245

def uniq
  lift(Operations::Uniq.new)
end