Class: QueueWorker
- Inherits:
-
Object
- Object
- QueueWorker
- Extended by:
- Forwardable
- Defined in:
- lib/queue_worker.rb,
lib/queue_worker/version.rb
Constant Summary collapse
- VERSION =
'2.0.0'
Class Attribute Summary collapse
-
.stomp ⇒ Object
Returns the value of attribute stomp.
Instance Attribute Summary collapse
- #client ⇒ Object
-
#handler ⇒ Object
Returns the value of attribute handler.
- #log ⇒ Object
-
#queue ⇒ Object
Returns the value of attribute queue.
Class Method Summary collapse
- .configure {|_self| ... } ⇒ Object
-
.peek(queue_name, size = 1) ⇒ Object
Peek at any number messages in the queue.
-
.publish(queue, *messages) ⇒ Object
Publish one or more messages to a queue.
Instance Method Summary collapse
-
#call(message) ⇒ Object
Handles
subscribecallback. -
#initialize(queue_name = nil, log = nil, &block) ⇒ QueueWorker
constructor
A new instance of QueueWorker.
-
#publish(message, headers = {}) ⇒ Object
(also: #push)
Publish a message to a queue.
-
#quit(queue_name = nil) ⇒ Object
Unsubscribe from the current queue and close the connection.
-
#subscribe(queue_name = nil, size = 1, &block) ⇒ Object
Subscribe (listen) to a queue.
-
#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object
Subscribe to a queue for a limited time.
-
#unsubscribe(queue_name = nil) ⇒ Object
Unsubscribe from the current queue.
Constructor Details
#initialize(queue_name = nil, log = nil, &block) ⇒ QueueWorker
27 28 29 30 31 |
# File 'lib/queue_worker.rb', line 27 def initialize(queue_name = nil, log = nil, &block) @queue = queue_name @log = log @handler = block || proc { |body| Kernel.const_get(body[:class]).call(body[:args]) } end |
Class Attribute Details
.stomp ⇒ Object
Returns the value of attribute stomp.
14 15 16 |
# File 'lib/queue_worker.rb', line 14 def stomp @stomp end |
Instance Attribute Details
#client ⇒ Object
124 125 126 |
# File 'lib/queue_worker.rb', line 124 def client @client ||= Stomp::Client.new(self.class.stomp) end |
#handler ⇒ Object
Returns the value of attribute handler.
22 23 24 |
# File 'lib/queue_worker.rb', line 22 def handler @handler end |
#log ⇒ Object
128 129 130 |
# File 'lib/queue_worker.rb', line 128 def log @log ||= Logger.new(STDOUT) end |
#queue ⇒ Object
Returns the value of attribute queue.
22 23 24 |
# File 'lib/queue_worker.rb', line 22 def queue @queue end |
Class Method Details
.configure {|_self| ... } ⇒ Object
16 17 18 |
# File 'lib/queue_worker.rb', line 16 def configure yield self end |
.peek(queue_name, size = 1) ⇒ Object
Peek at any number messages in the queue
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/queue_worker.rb', line 47 def self.peek(queue_name, size = 1) counter = 0 = [] worker = new(queue_name) worker.subscribe_with_timeout(2, size) do || counter += 1 << JSON.parse(.body).merge('message-id' => .headers['message-id']) worker.quit if counter == size end end |
.publish(queue, *messages) ⇒ Object
Publish one or more messages to a queue
37 38 39 40 41 |
# File 'lib/queue_worker.rb', line 37 def self.publish(queue, *) worker = new(queue) .each { |msg| worker.publish(msg) } worker.close end |
Instance Method Details
#call(message) ⇒ Object
Handles subscribe callback
Tries to delegate processing of message to a class based on the name of the queue. For example:
If the queue is named "scheduled/default" it will look for a class called Scheduled::Default to
initialize with the body and then call it's +call+ method
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/queue_worker.rb', line 113 def call() if .command == 'MESSAGE' handler.call(JSON.parse(.body, symbolize_names: true), ) end rescue => e log.error(e.) { "\n#{e.backtrace.inspect}" } ensure ack() log.info('Processed') { "#{message.headers['message-id']} for #{message.headers['destination']}" } end |
#publish(message, headers = {}) ⇒ Object Also known as: push
Publish a message to a queue
63 64 65 66 |
# File 'lib/queue_worker.rb', line 63 def publish(, headers = {}) = .to_json unless .is_a?(String) client.publish("/queue/#{queue}", , { priority: 4, persistent: true }.merge(headers)) end |
#quit(queue_name = nil) ⇒ Object
Unsubscribe from the current queue and close the connection
100 101 102 103 |
# File 'lib/queue_worker.rb', line 100 def quit(queue_name = nil) unsubscribe(queue_name) close end |
#subscribe(queue_name = nil, size = 1, &block) ⇒ Object
Subscribe (listen) to a queue
75 76 77 78 |
# File 'lib/queue_worker.rb', line 75 def subscribe(queue_name = nil, size = 1, &block) callback = block || method(:call) client.subscribe("/queue/#{queue_name || queue}", { :ack => 'client', 'activemq.prefetchSize' => size }, &callback) end |
#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object
Subscribe to a queue for a limited time
85 86 87 88 89 90 91 92 |
# File 'lib/queue_worker.rb', line 85 def subscribe_with_timeout(duration, size = 1, &block) Timeout::timeout(duration) do subscribe(nil, size, &block) join end rescue Timeout::Error quit end |
#unsubscribe(queue_name = nil) ⇒ Object
Unsubscribe from the current queue
95 96 97 |
# File 'lib/queue_worker.rb', line 95 def unsubscribe(queue_name = nil) client.unsubscribe("/queue/#{queue_name || queue}") end |