Class: VSM::Lens::EventHub

Inherits:
Object
  • Object
show all
Defined in:
lib/vsm/lens/event_hub.rb

Constant Summary collapse

DEFAULT_BUFFER =
500

Instance Method Summary collapse

Constructor Details

#initialize(buffer_size: DEFAULT_BUFFER) ⇒ EventHub

Returns a new instance of EventHub.



11
12
13
14
15
16
# File 'lib/vsm/lens/event_hub.rb', line 11

def initialize(buffer_size: DEFAULT_BUFFER)
  @subs = []                 # Array of SizedQueue
  @mutex = Mutex.new
  @buffer = []
  @buffer_size = buffer_size
end

Instance Method Details

#publish(message) ⇒ Object



18
19
20
21
22
23
24
25
# File 'lib/vsm/lens/event_hub.rb', line 18

def publish(message)
  event = format_event(message)
  @mutex.synchronize do
    @buffer << event
    @buffer.shift(@buffer.size - @buffer_size) if @buffer.size > @buffer_size
    @subs.each { |q| try_push(q, event) }
  end
end

#subscribeObject



27
28
29
30
31
32
33
34
35
# File 'lib/vsm/lens/event_hub.rb', line 27

def subscribe
  q = SizedQueue.new(100)
  snapshot = nil
  @mutex.synchronize do
    @subs << q
    snapshot = @buffer.dup
  end
  [q, snapshot]
end

#unsubscribe(queue) ⇒ Object



37
38
39
# File 'lib/vsm/lens/event_hub.rb', line 37

def unsubscribe(queue)
  @mutex.synchronize { @subs.delete(queue) }
end