Class: QueueWorker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/queue_worker.rb,
lib/queue_worker/version.rb

Constant Summary collapse

VERSION =
'2.0.0'

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name = nil, log = nil, &block) ⇒ QueueWorker



27
28
29
30
31
# File 'lib/queue_worker.rb', line 27

def initialize(queue_name = nil, log = nil, &block)
  @queue = queue_name
  @log = log
  @handler = block || proc { |body| Kernel.const_get(body[:class]).call(body[:args]) }
end

Class Attribute Details

.stompObject

Returns the value of attribute stomp.



14
15
16
# File 'lib/queue_worker.rb', line 14

def stomp
  @stomp
end

Instance Attribute Details

#clientObject



124
125
126
# File 'lib/queue_worker.rb', line 124

def client
  @client ||= Stomp::Client.new(self.class.stomp)
end

#handlerObject

Returns the value of attribute handler.



22
23
24
# File 'lib/queue_worker.rb', line 22

def handler
  @handler
end

#logObject



128
129
130
# File 'lib/queue_worker.rb', line 128

def log
  @log ||= Logger.new(STDOUT)
end

#queueObject

Returns the value of attribute queue.



22
23
24
# File 'lib/queue_worker.rb', line 22

def queue
  @queue
end

Class Method Details

.configure {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:

  • _self (QueueWorker)

    the object that the method was called on



16
17
18
# File 'lib/queue_worker.rb', line 16

def configure
  yield self
end

.peek(queue_name, size = 1) ⇒ Object

Peek at any number messages in the queue



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/queue_worker.rb', line 47

def self.peek(queue_name, size = 1)
  counter = 0
  messages = []
  worker = new(queue_name)
  worker.subscribe_with_timeout(2, size) do |message|
    counter += 1
    messages << JSON.parse(message.body).merge('message-id' => message.headers['message-id'])
    worker.quit if counter == size
  end
  messages
end

.publish(queue, *messages) ⇒ Object

Publish one or more messages to a queue



37
38
39
40
41
# File 'lib/queue_worker.rb', line 37

def self.publish(queue, *messages)
  worker = new(queue)
  messages.each { |msg| worker.publish(msg) }
  worker.close
end

Instance Method Details

#call(message) ⇒ Object

Handles subscribe callback

Tries to delegate processing of message to a class based on the name of the queue. For example:

If the queue is named "scheduled/default" it will look for a class called Scheduled::Default to
initialize with the message body and then call it's +call+ method


113
114
115
116
117
118
119
120
121
122
# File 'lib/queue_worker.rb', line 113

def call(message)
  if message.command == 'MESSAGE'
    handler.call(JSON.parse(message.body, symbolize_names: true), message)
  end
rescue => e
  log.error(e.message) { "\n#{e.backtrace.inspect}" }
ensure
  ack(message)
  log.info('Processed') { "#{message.headers['message-id']} for #{message.headers['destination']}" }
end

#publish(message, headers = {}) ⇒ Object Also known as: push

Publish a message to a queue



63
64
65
66
# File 'lib/queue_worker.rb', line 63

def publish(message, headers = {})
  message = message.to_json unless message.is_a?(String)
  client.publish("/queue/#{queue}", message, { priority: 4, persistent: true }.merge(headers))
end

#quit(queue_name = nil) ⇒ Object

Unsubscribe from the current queue and close the connection



100
101
102
103
# File 'lib/queue_worker.rb', line 100

def quit(queue_name = nil)
  unsubscribe(queue_name)
  close
end

#subscribe(queue_name = nil, size = 1, &block) ⇒ Object

Subscribe (listen) to a queue



75
76
77
78
# File 'lib/queue_worker.rb', line 75

def subscribe(queue_name = nil, size = 1, &block)
  callback = block || method(:call)
  client.subscribe("/queue/#{queue_name || queue}", { :ack => 'client', 'activemq.prefetchSize' => size }, &callback)
end

#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object

Subscribe to a queue for a limited time



85
86
87
88
89
90
91
92
# File 'lib/queue_worker.rb', line 85

def subscribe_with_timeout(duration, size = 1, &block)
  Timeout::timeout(duration) do
    subscribe(nil, size, &block)
    join
  end
rescue Timeout::Error
  quit
end

#unsubscribe(queue_name = nil) ⇒ Object

Unsubscribe from the current queue



95
96
97
# File 'lib/queue_worker.rb', line 95

def unsubscribe(queue_name = nil)
  client.unsubscribe("/queue/#{queue_name || queue}")
end