Class: EventQ::RabbitMq::EventQClient
- Inherits:
-
Object
- Object
- EventQ::RabbitMq::EventQClient
- Defined in:
- lib/eventq_rabbitmq/rabbitmq_eventq_client.rb
Overview
Implements a general interface to raise an event EventQ::Amazon::EventQClient is the sister-class which does the same for AWS
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ EventQClient
constructor
A new instance of EventQClient.
- #new_message ⇒ Object
- #publish(topic:, event:, context: {}) ⇒ Object
- #raise_event(event_type, event, context = {}) ⇒ Object
- #raise_event_in_queue(event_type, event, queue, delay, context = {}) ⇒ Object
- #register_event(event_type) ⇒ Object
- #registered?(event_type) ⇒ Boolean
Constructor Details
#initialize(options = {}) ⇒ EventQClient
Returns a new instance of EventQClient.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 7 def initialize(={}) if [:client] == nil raise ':client (QueueClient) must be specified.'.freeze end @client = [:client] @queue_manager = QueueManager.new @event_raised_exchange = EventRaisedExchange.new @serialization_manager = EventQ::SerializationProviders::Manager.new @signature_manager = EventQ::SignatureProviders::Manager.new #this array is used to record known event types @known_event_types = [] end |
Instance Method Details
#new_message ⇒ Object
86 87 88 |
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 86 def EventQ::QueueMessage.new end |
#publish(topic:, event:, context: {}) ⇒ Object
36 37 38 |
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 36 def publish(topic:, event:, context: {}) raise_event(topic, event, context) end |
#raise_event(event_type, event, context = {}) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 40 def raise_event(event_type, event, context = {}) register_event(event_type) _event_type = EventQ.create_event_type(event_type) with_connection do |channel| exchange = @queue_manager.get_exchange(channel, @event_raised_exchange) = (event_type, event, context) exchange.publish(, routing_key: _event_type) EventQ.logger.debug do "[#{self.class}] - Raised event to Exchange. Routing Key: #{_event_type} | Message: #{}." end end end |
#raise_event_in_queue(event_type, event, queue, delay, context = {}) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 58 def raise_event_in_queue(event_type, event, queue, delay, context = {}) register_event(event_type) _event_type = EventQ.create_event_type(event_type) with_connection do |channel| exchange = @queue_manager.get_queue_exchange(channel, queue) delay_exchange = @queue_manager.get_delay_exchange(channel, queue, delay) delay_queue = @queue_manager.create_delay_queue(channel, queue, exchange.name, delay) delay_queue.bind(delay_exchange, routing_key: _event_type) _queue_name = EventQ.create_queue_name(queue.name) q = channel.queue(_queue_name, durable: @queue_manager.durable) q.bind(exchange, routing_key: _event_type) = (event_type, event, context) delay_exchange.publish(, routing_key: _event_type) EventQ.logger.debug do "[#{self.class}] - Raised event to Queue: #{_queue_name} | Message: #{} | Delay: #{delay}." end end end |
#register_event(event_type) ⇒ Object
27 28 29 30 31 32 33 34 |
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 27 def register_event(event_type) if registered?(event_type) return true end @known_event_types << event_type true end |
#registered?(event_type) ⇒ Boolean
23 24 25 |
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 23 def registered?(event_type) @known_event_types.include?(event_type) end |