Class: VSM::AsyncChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/vsm/async_channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(context: {}) ⇒ AsyncChannel



7
8
9
10
11
# File 'lib/vsm/async_channel.rb', line 7

def initialize(context: {})
  @queue = Async::Queue.new
  @subs  = []
  @context = context
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



5
6
7
# File 'lib/vsm/async_channel.rb', line 5

def context
  @context
end

Instance Method Details

#emit(message) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/vsm/async_channel.rb', line 13

def emit(message)
  begin
    @queue.enqueue(message)
  rescue StandardError
    # If no async scheduler is available in this thread, best-effort enqueue later.
  end
  @subs.each do |blk|
    begin
      Async { blk.call(message) }
    rescue StandardError
      # Fallback when no Async task is active in this thread
      begin
        blk.call(message)
      rescue StandardError
        # ignore subscriber errors
      end
    end
  end
end

#popObject



33
# File 'lib/vsm/async_channel.rb', line 33

def pop = @queue.dequeue

#subscribe(&blk) ⇒ Object



35
36
37
38
# File 'lib/vsm/async_channel.rb', line 35

def subscribe(&blk)
  @subs << blk
  blk
end

#unsubscribe(subscriber) ⇒ Object



40
41
42
# File 'lib/vsm/async_channel.rb', line 40

def unsubscribe(subscriber)
  @subs.delete(subscriber)
end