Class: QueueWorker
- Inherits:
-
Object
- Object
- QueueWorker
- Extended by:
- Forwardable
- Defined in:
- lib/queue_worker.rb,
lib/queue_worker/version.rb
Constant Summary collapse
- VERSION =
'3.0.0'
Class Attribute Summary collapse
-
.stomp ⇒ Object
Returns the value of attribute stomp.
Instance Attribute Summary collapse
- #client ⇒ Object
-
#handler ⇒ Object
Returns the value of attribute handler.
- #log ⇒ Object
-
#queue ⇒ Object
Returns the value of attribute queue.
Class Method Summary collapse
- .configure {|_self| ... } ⇒ Object
-
.peek(queue_name, size = 1) ⇒ Object
Peek at any number messages in the queue.
-
.publish(queue, *messages) ⇒ Object
Publish one or more messages to a queue.
-
.subscribe(*args, &block) ⇒ Object
Start a subscription worker with the given args.
Instance Method Summary collapse
-
#call(message) ⇒ Object
Handles
subscribecallback. -
#initialize(queue_name = nil, log = nil, &block) ⇒ QueueWorker
constructor
A new instance of QueueWorker.
-
#publish(message, headers = {}) ⇒ Object
(also: #push)
Publish a message to a queue.
-
#quit(queue_name = nil) ⇒ Object
Unsubscribe from the current queue and close the connection.
-
#subscribe(queue_name = nil, size = 1, &block) ⇒ Object
Subscribe (listen) to a queue.
-
#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object
Subscribe to a queue for a limited time.
-
#unsubscribe(queue_name = nil) ⇒ Object
Unsubscribe from the current queue.
Constructor Details
#initialize(queue_name = nil, log = nil, &block) ⇒ QueueWorker
Returns a new instance of QueueWorker.
27 28 29 30 31 |
# File 'lib/queue_worker.rb', line 27 def initialize(queue_name = nil, log = nil, &block) @queue = queue_name @log = log @handler = block || proc { |body| Kernel.const_get(body[:class]).call(body[:args]) } end |
Class Attribute Details
.stomp ⇒ Object
Returns the value of attribute stomp.
14 15 16 |
# File 'lib/queue_worker.rb', line 14 def stomp @stomp end |
Instance Attribute Details
#client ⇒ Object
131 132 133 |
# File 'lib/queue_worker.rb', line 131 def client @client ||= Stomp::Client.new(self.class.stomp) end |
#handler ⇒ Object
Returns the value of attribute handler.
22 23 24 |
# File 'lib/queue_worker.rb', line 22 def handler @handler end |
#log ⇒ Object
135 136 137 |
# File 'lib/queue_worker.rb', line 135 def log @log ||= Logger.new(STDOUT) end |
#queue ⇒ Object
Returns the value of attribute queue.
22 23 24 |
# File 'lib/queue_worker.rb', line 22 def queue @queue end |
Class Method Details
.configure {|_self| ... } ⇒ Object
16 17 18 |
# File 'lib/queue_worker.rb', line 16 def configure yield self end |
.peek(queue_name, size = 1) ⇒ Object
Peek at any number messages in the queue
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/queue_worker.rb', line 47 def self.peek(queue_name, size = 1) counter = 0 = [] worker = new(queue_name) worker.subscribe_with_timeout(2, size) do || counter += 1 << JSON.parse(.body).merge('message-id' => .headers['message-id']) worker.quit if counter == size end end |
.publish(queue, *messages) ⇒ Object
Publish one or more messages to a queue
37 38 39 40 41 |
# File 'lib/queue_worker.rb', line 37 def self.publish(queue, *) worker = new(queue) .each { |msg| worker.publish(msg) } worker.close end |
.subscribe(*args, &block) ⇒ Object
Start a subscription worker with the given args
60 61 62 63 64 |
# File 'lib/queue_worker.rb', line 60 def self.subscribe(*args, &block) worker = new(*args, &block) worker.subscribe worker.join end |
Instance Method Details
#call(message) ⇒ Object
Handles subscribe callback
Tries to delegate processing of message to a class based on the name of the queue. For example:
If the queue is named "scheduled/default" it will look for a class called Scheduled::Default to
initialize with the message body and then call it's +call+ method
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/queue_worker.rb', line 120 def call() if .command == 'MESSAGE' handler.call(JSON.parse(.body, symbolize_names: true)) end rescue => e log.error(e.) { "\n#{e.backtrace.inspect}" } ensure ack() log.info('Processed') { %(#{.headers['message-id']} from "#{.headers['destination']}") } end |
#publish(message, headers = {}) ⇒ Object Also known as: push
Publish a message to a queue
70 71 72 73 |
# File 'lib/queue_worker.rb', line 70 def publish(, headers = {}) = .to_json unless .is_a?(String) client.publish("/queue/#{queue}", , { priority: 4, persistent: true }.merge(headers)) end |
#quit(queue_name = nil) ⇒ Object
Unsubscribe from the current queue and close the connection
107 108 109 110 |
# File 'lib/queue_worker.rb', line 107 def quit(queue_name = nil) unsubscribe(queue_name) close end |
#subscribe(queue_name = nil, size = 1, &block) ⇒ Object
Subscribe (listen) to a queue
82 83 84 85 |
# File 'lib/queue_worker.rb', line 82 def subscribe(queue_name = nil, size = 1, &block) callback = block || method(:call) client.subscribe("/queue/#{queue_name || queue}", { :ack => 'client', 'activemq.prefetchSize' => size }, &callback) end |
#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object
Subscribe to a queue for a limited time
92 93 94 95 96 97 98 99 |
# File 'lib/queue_worker.rb', line 92 def subscribe_with_timeout(duration, size = 1, &block) Timeout::timeout(duration) do subscribe(nil, size, &block) join end rescue Timeout::Error quit end |
#unsubscribe(queue_name = nil) ⇒ Object
Unsubscribe from the current queue
102 103 104 |
# File 'lib/queue_worker.rb', line 102 def unsubscribe(queue_name = nil) client.unsubscribe("/queue/#{queue_name || queue}") end |