Class: EventQ::RabbitMq::EventQClient

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq_rabbitmq/rabbitmq_eventq_client.rb

Overview

Implements a general interface to raise an event EventQ::Amazon::EventQClient is the sister-class which does the same for AWS

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ EventQClient

Returns a new instance of EventQClient.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 7

def initialize(options={})

  if options[:client] == nil
    raise ':client (QueueClient) must be specified.'.freeze
  end

  @client = options[:client]
  @queue_manager = QueueManager.new
  @event_raised_exchange = EventRaisedExchange.new
  @serialization_manager = EventQ::SerializationProviders::Manager.new
  @signature_manager = EventQ::SignatureProviders::Manager.new

  #this array is used to record known event types
  @known_event_types = []
end

Instance Method Details

#new_messageObject



86
87
88
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 86

def new_message
  EventQ::QueueMessage.new
end

#publish(topic:, event:, context: {}) ⇒ Object



36
37
38
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 36

def publish(topic:, event:, context: {})
  raise_event(topic, event, context)
end

#raise_event(event_type, event, context = {}) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 40

def raise_event(event_type, event, context = {})
  register_event(event_type)

  _event_type = EventQ.create_event_type(event_type)

  with_connection do |channel|
    exchange = @queue_manager.get_exchange(channel, @event_raised_exchange)

    message = serialized_message(event_type, event, context)

    exchange.publish(message, routing_key: _event_type)

    EventQ.logger.debug do
      "[#{self.class}] - Raised event to Exchange. Routing Key: #{_event_type} | Message: #{message}."
    end
  end
end

#raise_event_in_queue(event_type, event, queue, delay, context = {}) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 58

def raise_event_in_queue(event_type, event, queue, delay, context = {})
  register_event(event_type)

  _event_type = EventQ.create_event_type(event_type)

  with_connection do |channel|
    exchange = @queue_manager.get_queue_exchange(channel, queue)

    delay_exchange = @queue_manager.get_delay_exchange(channel, queue, delay)

    delay_queue = @queue_manager.create_delay_queue(channel, queue, exchange.name, delay)
    delay_queue.bind(delay_exchange, routing_key: _event_type)

    _queue_name = EventQ.create_queue_name(queue.name)

    q = channel.queue(_queue_name, durable: @queue_manager.durable)
    q.bind(exchange, routing_key: _event_type)

    message = serialized_message(event_type, event, context)

    delay_exchange.publish(message, routing_key: _event_type)

    EventQ.logger.debug do
      "[#{self.class}] - Raised event to Queue: #{_queue_name} | Message: #{message} | Delay: #{delay}."
    end
  end
end

#register_event(event_type) ⇒ Object



27
28
29
30
31
32
33
34
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 27

def register_event(event_type)
  if registered?(event_type)
    return true
  end

  @known_event_types << event_type
  true
end

#registered?(event_type) ⇒ Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/eventq_rabbitmq/rabbitmq_eventq_client.rb', line 23

def registered?(event_type)
  @known_event_types.include?(event_type)
end