Class: EventQ::RabbitMq::QueueWorker

Inherits:
Object
  • Object
show all
Includes:
WorkerId
Defined in:
lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from WorkerId

#tag_processing_thread, #untag_processing_thread

Constructor Details

#initializeQueueWorker

Returns a new instance of QueueWorker.



8
9
10
11
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 8

def initialize
  @serialization_provider_manager = EventQ::SerializationProviders::Manager.new
  @signature_provider_manager = EventQ::SignatureProviders::Manager.new
end

Instance Attribute Details

#contextObject

Returns the value of attribute context.



6
7
8
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 6

def context
  @context
end

#is_runningObject

Returns the value of attribute is_running.



6
7
8
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 6

def is_running
  @is_running
end

Instance Method Details

#acknowledge_message(channel, delivery_tag) ⇒ Object

Logic for the RabbitMq adapter when a message is accepted



97
98
99
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 97

def acknowledge_message(channel, delivery_tag)
  channel.acknowledge(delivery_tag, false)
end

#configure(options = {}) ⇒ Object



92
93
94
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 92

def configure(options = {})
  options[:durable] ||= true
end

#deserialize_message(payload) ⇒ Object



56
57
58
59
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 56

def deserialize_message(payload)
  provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider)
  return provider.deserialize(payload)
end

#pre_process(context, options) ⇒ Object



13
14
15
16
17
18
19
20
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 13

def pre_process(context, options)
  manager = EventQ::RabbitMq::QueueManager.new
  manager.durable = options[:durable]
  options[:manager] = manager

  connection = options[:client].dup.get_connection
  options[:connection] = connection
end

#reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 66

def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort)
  EventQ.logger.info("[#{self.class}] - Message rejected removing from queue.")
  # reject the message to remove from queue
  channel.reject(delivery_tag, false)

  # check if the message retry limit has been exceeded
  if message.retry_attempts >= queue.max_retry_attempts
    EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded. Msg: #{serialize_message(message)}")

    context.call_on_retry_exceeded_block(message)
  # check if the message is allowed to be retried
  elsif queue.allow_retry
    message.retry_attempts += 1
    retry_attempts = message.retry_attempts - queue.retry_back_off_grace
    retry_attempts = 1 if retry_attempts < 1
    message_ttl = retry_delay(queue, retry_attempts)

    EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{message_ttl}" }
    retry_exchange.publish(serialize_message(message), :expiration => message_ttl)

    context.call_on_retry_block(message)
  end

  return true
end

#serialize_message(msg) ⇒ Object



61
62
63
64
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 61

def serialize_message(msg)
  provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider)
  return provider.serialize(msg)
end

#thread_process_iteration(queue, options, block) ⇒ Object

This method should not be called iteratively and will sit in a loop The reason is because this uses a push notification from the subscribe mechanism to trigger the block and will exit if you do not block.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb', line 25

def thread_process_iteration(queue, options, block)
  manager = options[:manager]
  channel = options[:connection].create_channel
  channel.prefetch(1)

  q = manager.get_queue(channel, queue)
  retry_exchange = manager.get_retry_exchange(channel, queue)

  q.subscribe(:manual_ack => true, :block => false, :exclusive => false) do |delivery_info, properties, payload|
    begin
      tag_processing_thread
      process_message(payload, queue, channel, retry_exchange, delivery_info.delivery_tag, block)
    rescue => e
      EventQ.logger.error(
          "[#{self.class}] - An error occurred attempting to process a message. Error: #{e} | "\
"Backtrace: #{e.backtrace}"
      )
      context.call_on_error_block(error: e)
    ensure
      untag_processing_thread
    end
  end

  # we don't want to stop the subscribe process as it will not block.
  sleep 5 while context.running?

  if channel != nil && channel.open?
    channel.close
  end
end