Class: Deepstream::EventHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/deepstream/event_handler.rb

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ EventHandler

Returns a new instance of EventHandler.



7
8
9
10
11
12
# File 'lib/deepstream/event_handler.rb', line 7

def initialize(client)
  @client = client
  @callbacks = {}
  @listeners = {}
  @ack_timeout_registry = AckTimeoutRegistry.new(@client)
end

Instance Method Details

#emit(event, data = nil, timeout: ) ⇒ Object



52
53
54
55
56
# File 'lib/deepstream/event_handler.rb', line 52

def emit(event, data = nil, timeout: @client.options[:emit_timeout])
  @client.send_message(TOPIC::EVENT, ACTION::EVENT, event, Helpers.to_deepstream_type(data), timeout: timeout)
rescue => e
  @client.on_exception(e)
end

#listen(pattern, &block) ⇒ Object



25
26
27
28
29
30
31
32
# File 'lib/deepstream/event_handler.rb', line 25

def listen(pattern, &block)
  pattern = pattern.is_a?(Regexp) ? pattern.source : pattern
  @listeners[pattern] = block
  @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern)
  @ack_timeout_registry.add(pattern, "No ACK message received in time for #{pattern}")
rescue => e
  @client.on_exception(e)
end

#on(event, &block) ⇒ Object Also known as: subscribe



14
15
16
17
18
19
20
21
22
# File 'lib/deepstream/event_handler.rb', line 14

def on(event, &block)
  unless @callbacks[event]
    @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event)
    @ack_timeout_registry.add(event, "No ACK message received in time for #{event}")
  end
  @callbacks[event] = block
rescue => e
  @client.on_exception(e)
end

#on_message(message) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/deepstream/event_handler.rb', line 42

def on_message(message)
  case message.action
  when ACTION::ACK then @ack_timeout_registry.cancel(message.data.last)
  when ACTION::EVENT then fire_event_callback(message)
  when ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND then fire_listen_callback(message)
  when ACTION::SUBSCRIPTION_FOR_PATTERN_REMOVED then fire_listen_callback(message)
  else @client.on_error(message)
  end
end

#resubscribeObject



65
66
67
68
69
70
# File 'lib/deepstream/event_handler.rb', line 65

def resubscribe
  @callbacks.keys.each { |event| @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event) }
  @listeners.keys.each { |pattern| @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern) }
rescue => e
  @client.on_exception(e)
end

#unlisten(pattern) ⇒ Object



34
35
36
37
38
39
40
# File 'lib/deepstream/event_handler.rb', line 34

def unlisten(pattern)
  pattern = pattern.is_a?(Regexp) ? pattern.source : pattern
  @listeners.delete(pattern)
  @client.send_message(TOPIC::EVENT, ACTION::UNLISTEN, pattern)
rescue => e
  @client.on_exception(e)
end

#unsubscribe(event) ⇒ Object



58
59
60
61
62
63
# File 'lib/deepstream/event_handler.rb', line 58

def unsubscribe(event)
  @callbacks.delete(event)
  @client.send_message(TOPIC::EVENT, ACTION::UNSUBSCRIBE, event)
rescue => e
  @client.on_exception(e)
end