Class: Deepstream::EventHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/deepstream/event_handler.rb

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ EventHandler

Returns a new instance of EventHandler.



8
9
10
11
12
13
# File 'lib/deepstream/event_handler.rb', line 8

def initialize(client)
  @client = client
  @callbacks = {}
  @listeners = {}
  @ack_timeout_registry = AckTimeoutRegistry.new(@client)
end

Instance Method Details

#emit(event, *args, timeout: , **kwargs) ⇒ Object



53
54
55
56
57
58
# File 'lib/deepstream/event_handler.rb', line 53

def emit(event, *args, timeout: @client.options[:emit_timeout], **kwargs)
  data = Helpers.message_data(*args, **kwargs)
  @client.send_message(TOPIC::EVENT, ACTION::EVENT, event, Helpers.to_deepstream_type(data), timeout: timeout)
rescue => e
  @client.on_exception(e)
end

#listen(pattern, &block) ⇒ Object



26
27
28
29
30
31
32
33
# File 'lib/deepstream/event_handler.rb', line 26

def listen(pattern, &block)
  pattern = pattern.is_a?(Regexp) ? pattern.source : pattern
  @listeners[pattern] = block
  @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern)
  @ack_timeout_registry.add(pattern, "No ACK message received in time for #{pattern}")
rescue => e
  @client.on_exception(e)
end

#on(event, &block) ⇒ Object Also known as: subscribe



15
16
17
18
19
20
21
22
23
# File 'lib/deepstream/event_handler.rb', line 15

def on(event, &block)
  unless @callbacks[event]
    @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event) if @client.state == CONNECTION_STATE::OPEN
    @ack_timeout_registry.add(event, "No ACK message received in time for #{event}")
  end
  @callbacks[event] = block
rescue => e
  @client.on_exception(e)
end

#on_message(message) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/deepstream/event_handler.rb', line 43

def on_message(message)
  case message.action
  when ACTION::ACK then @ack_timeout_registry.cancel(message.data.last)
  when ACTION::EVENT then fire_event_callback(message)
  when ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND then fire_listen_callback(message)
  when ACTION::SUBSCRIPTION_FOR_PATTERN_REMOVED then fire_listen_callback(message)
  else raise(UnknownAction, message)
  end
end

#resubscribeObject



67
68
69
70
71
72
# File 'lib/deepstream/event_handler.rb', line 67

def resubscribe
  @callbacks.keys.each { |event| @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event) }
  @listeners.keys.each { |pattern| @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern) }
rescue => e
  @client.on_exception(e)
end

#unlisten(pattern) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/deepstream/event_handler.rb', line 35

def unlisten(pattern)
  pattern = pattern.is_a?(Regexp) ? pattern.source : pattern
  @listeners.delete(pattern)
  @client.send_message(TOPIC::EVENT, ACTION::UNLISTEN, pattern)
rescue => e
  @client.on_exception(e)
end

#unsubscribe(event) ⇒ Object



60
61
62
63
64
65
# File 'lib/deepstream/event_handler.rb', line 60

def unsubscribe(event)
  @callbacks.delete(event)
  @client.send_message(TOPIC::EVENT, ACTION::UNSUBSCRIBE, event)
rescue => e
  @client.on_exception(e)
end