Class: Deepstream::EventHandler
- Inherits:
-
Object
- Object
- Deepstream::EventHandler
- Defined in:
- lib/deepstream/event_handler.rb
Instance Method Summary collapse
- #emit(event, *args, timeout: , **kwargs) ⇒ Object
-
#initialize(client) ⇒ EventHandler
constructor
A new instance of EventHandler.
- #listen(pattern, &block) ⇒ Object
- #on(event, &block) ⇒ Object (also: #subscribe)
- #on_message(message) ⇒ Object
- #resubscribe ⇒ Object
- #unlisten(pattern) ⇒ Object
- #unsubscribe(event) ⇒ Object
Constructor Details
#initialize(client) ⇒ EventHandler
Returns a new instance of EventHandler.
8 9 10 11 12 13 |
# File 'lib/deepstream/event_handler.rb', line 8 def initialize(client) @client = client @callbacks = {} @listeners = {} @ack_timeout_registry = AckTimeoutRegistry.new(@client) end |
Instance Method Details
#emit(event, *args, timeout: , **kwargs) ⇒ Object
53 54 55 56 57 58 |
# File 'lib/deepstream/event_handler.rb', line 53 def emit(event, *args, timeout: @client.[:emit_timeout], **kwargs) data = Helpers.(*args, **kwargs) @client.(TOPIC::EVENT, ACTION::EVENT, event, Helpers.to_deepstream_type(data), timeout: timeout) rescue => e @client.on_exception(e) end |
#listen(pattern, &block) ⇒ Object
26 27 28 29 30 31 32 33 |
# File 'lib/deepstream/event_handler.rb', line 26 def listen(pattern, &block) pattern = pattern.is_a?(Regexp) ? pattern.source : pattern @listeners[pattern] = block @client.(TOPIC::EVENT, ACTION::LISTEN, pattern) @ack_timeout_registry.add(pattern, "No ACK message received in time for #{pattern}") rescue => e @client.on_exception(e) end |
#on(event, &block) ⇒ Object Also known as: subscribe
15 16 17 18 19 20 21 22 23 |
# File 'lib/deepstream/event_handler.rb', line 15 def on(event, &block) unless @callbacks[event] @client.(TOPIC::EVENT, ACTION::SUBSCRIBE, event) if @client.state == CONNECTION_STATE::OPEN @ack_timeout_registry.add(event, "No ACK message received in time for #{event}") end @callbacks[event] = block rescue => e @client.on_exception(e) end |
#on_message(message) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/deepstream/event_handler.rb', line 43 def () case .action when ACTION::ACK then @ack_timeout_registry.cancel(.data.last) when ACTION::EVENT then fire_event_callback() when ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND then fire_listen_callback() when ACTION::SUBSCRIPTION_FOR_PATTERN_REMOVED then fire_listen_callback() else raise(UnknownAction, ) end end |
#resubscribe ⇒ Object
67 68 69 70 71 72 |
# File 'lib/deepstream/event_handler.rb', line 67 def resubscribe @callbacks.keys.each { |event| @client.(TOPIC::EVENT, ACTION::SUBSCRIBE, event) } @listeners.keys.each { |pattern| @client.(TOPIC::EVENT, ACTION::LISTEN, pattern) } rescue => e @client.on_exception(e) end |
#unlisten(pattern) ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/deepstream/event_handler.rb', line 35 def unlisten(pattern) pattern = pattern.is_a?(Regexp) ? pattern.source : pattern @listeners.delete(pattern) @client.(TOPIC::EVENT, ACTION::UNLISTEN, pattern) rescue => e @client.on_exception(e) end |
#unsubscribe(event) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/deepstream/event_handler.rb', line 60 def unsubscribe(event) @callbacks.delete(event) @client.(TOPIC::EVENT, ACTION::UNSUBSCRIBE, event) rescue => e @client.on_exception(e) end |