Class: Deepstream::EventHandler
- Inherits:
-
Object
- Object
- Deepstream::EventHandler
- Defined in:
- lib/deepstream/event_handler.rb
Instance Method Summary collapse
- #emit(event, data = nil, timeout: ) ⇒ Object
-
#initialize(client) ⇒ EventHandler
constructor
A new instance of EventHandler.
- #listen(pattern, &block) ⇒ Object
- #on(event, &block) ⇒ Object (also: #subscribe)
- #on_message(message) ⇒ Object
- #resubscribe ⇒ Object
- #unlisten(pattern) ⇒ Object
- #unsubscribe(event) ⇒ Object
Constructor Details
#initialize(client) ⇒ EventHandler
Returns a new instance of EventHandler.
7 8 9 10 11 12 |
# File 'lib/deepstream/event_handler.rb', line 7 def initialize(client) @client = client @callbacks = {} @listeners = {} @ack_timeout_registry = AckTimeoutRegistry.new(@client) end |
Instance Method Details
#emit(event, data = nil, timeout: ) ⇒ Object
52 53 54 55 56 |
# File 'lib/deepstream/event_handler.rb', line 52 def emit(event, data = nil, timeout: @client.[:emit_timeout]) @client.(TOPIC::EVENT, ACTION::EVENT, event, Helpers.to_deepstream_type(data), timeout: timeout) rescue => e @client.on_exception(e) end |
#listen(pattern, &block) ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/deepstream/event_handler.rb', line 25 def listen(pattern, &block) pattern = pattern.is_a?(Regexp) ? pattern.source : pattern @listeners[pattern] = block @client.(TOPIC::EVENT, ACTION::LISTEN, pattern) @ack_timeout_registry.add(pattern, "No ACK message received in time for #{pattern}") rescue => e @client.on_exception(e) end |
#on(event, &block) ⇒ Object Also known as: subscribe
14 15 16 17 18 19 20 21 22 |
# File 'lib/deepstream/event_handler.rb', line 14 def on(event, &block) unless @callbacks[event] @client.(TOPIC::EVENT, ACTION::SUBSCRIBE, event) @ack_timeout_registry.add(event, "No ACK message received in time for #{event}") end @callbacks[event] = block rescue => e @client.on_exception(e) end |
#on_message(message) ⇒ Object
42 43 44 45 46 47 48 49 50 |
# File 'lib/deepstream/event_handler.rb', line 42 def () case .action when ACTION::ACK then @ack_timeout_registry.cancel(.data.last) when ACTION::EVENT then fire_event_callback() when ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND then fire_listen_callback() when ACTION::SUBSCRIPTION_FOR_PATTERN_REMOVED then fire_listen_callback() else @client.on_error() end end |
#resubscribe ⇒ Object
65 66 67 68 69 70 |
# File 'lib/deepstream/event_handler.rb', line 65 def resubscribe @callbacks.keys.each { |event| @client.(TOPIC::EVENT, ACTION::SUBSCRIBE, event) } @listeners.keys.each { |pattern| @client.(TOPIC::EVENT, ACTION::LISTEN, pattern) } rescue => e @client.on_exception(e) end |
#unlisten(pattern) ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/deepstream/event_handler.rb', line 34 def unlisten(pattern) pattern = pattern.is_a?(Regexp) ? pattern.source : pattern @listeners.delete(pattern) @client.(TOPIC::EVENT, ACTION::UNLISTEN, pattern) rescue => e @client.on_exception(e) end |
#unsubscribe(event) ⇒ Object
58 59 60 61 62 63 |
# File 'lib/deepstream/event_handler.rb', line 58 def unsubscribe(event) @callbacks.delete(event) @client.(TOPIC::EVENT, ACTION::UNSUBSCRIBE, event) rescue => e @client.on_exception(e) end |