Class: QueueWorker
- Inherits:
-
Object
- Object
- QueueWorker
- Extended by:
- Forwardable
- Defined in:
- lib/queue_worker.rb,
lib/queue_worker/version.rb
Constant Summary collapse
- VERSION =
'1.0.1'
Class Attribute Summary collapse
-
.stomp ⇒ Object
Returns the value of attribute stomp.
Instance Attribute Summary collapse
- #client ⇒ Object
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#queue ⇒ Object
Returns the value of attribute queue.
Class Method Summary collapse
- .configure {|_self| ... } ⇒ Object
-
.peek(queue_name, size = 1) ⇒ Object
Peek at any number messages in the queue.
-
.publish(queue, *messages) ⇒ Object
Publish one or more messages to a queue.
Instance Method Summary collapse
-
#call(message) ⇒ Object
Handles
subscribecallback. -
#default_subscribe_callback(message) ⇒ Object
Private.
-
#initialize(queue_name = nil, client = nil) ⇒ QueueWorker
constructor
A new instance of QueueWorker.
-
#load_handler_class(destination_queue) ⇒ Object
Converts the queue name to a constant.
- #log ⇒ Object
-
#publish(message, headers = {}) ⇒ Object
(also: #push)
Publish a message to a queue.
-
#quit ⇒ Object
Unsubscribe from the current queue and close the connection.
-
#subscribe(size = 1, &block) ⇒ Object
Subscribe (listen) to a queue.
-
#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object
Subscribe to a queue for a limited time.
-
#unsubscribe ⇒ Object
Unsubscribe from the current queue.
Constructor Details
#initialize(queue_name = nil, client = nil) ⇒ QueueWorker
26 27 28 29 30 |
# File 'lib/queue_worker.rb', line 26 def initialize(queue_name = nil, client = nil) @queue = queue_name @client = client @handler = proc { |args, | load_handler_class(.headers['destination']).new(args).call } end |
Class Attribute Details
.stomp ⇒ Object
Returns the value of attribute stomp.
14 15 16 |
# File 'lib/queue_worker.rb', line 14 def stomp @stomp end |
Instance Attribute Details
#client ⇒ Object
142 143 144 |
# File 'lib/queue_worker.rb', line 142 def client @client ||= Stomp::Client.new(self.class.stomp) end |
#handler ⇒ Object
Returns the value of attribute handler.
22 23 24 |
# File 'lib/queue_worker.rb', line 22 def handler @handler end |
#queue ⇒ Object
Returns the value of attribute queue.
22 23 24 |
# File 'lib/queue_worker.rb', line 22 def queue @queue end |
Class Method Details
.configure {|_self| ... } ⇒ Object
16 17 18 |
# File 'lib/queue_worker.rb', line 16 def configure yield self end |
.peek(queue_name, size = 1) ⇒ Object
Peek at any number messages in the queue
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/queue_worker.rb', line 46 def self.peek(queue_name, size = 1) counter = 0 = [] worker = new(queue_name) worker.subscribe_with_timeout(2, size) do || counter += 1 << JSON.parse(.body).merge('message-id' => .headers['message-id']) worker.quit if counter == size end end |
.publish(queue, *messages) ⇒ Object
Publish one or more messages to a queue
36 37 38 39 40 |
# File 'lib/queue_worker.rb', line 36 def self.publish(queue, *) worker = new(queue) .each { |msg| worker.publish(msg) } worker.close end |
Instance Method Details
#call(message) ⇒ Object
Handles subscribe callback
Tries to delegate processing of message to a class based on the name of the queue. For example:
If the queue is named "scheduled/default" it will look for a class called Scheduled::Default to
initialize with the body and then call it's +call+ method
111 112 113 114 115 116 117 118 |
# File 'lib/queue_worker.rb', line 111 def call() handler.call(JSON.parse(.body, symbolize_names: true), ) rescue => e log.error(e.) { "\n#{e.backtrace.inspect}" } ensure ack() log.info('Processed') { } end |
#default_subscribe_callback(message) ⇒ Object
Private
124 125 126 127 128 129 130 131 132 |
# File 'lib/queue_worker.rb', line 124 def default_subscribe_callback() if .command == 'MESSAGE' if .body == 'UNSUBSCRIBE' unsubscribe else call() end end end |
#load_handler_class(destination_queue) ⇒ Object
Converts the queue name to a constant
138 139 140 |
# File 'lib/queue_worker.rb', line 138 def load_handler_class(destination_queue) destination_queue.gsub(%r!^/?queue/!, '').camelize.constantize end |
#log ⇒ Object
146 147 148 |
# File 'lib/queue_worker.rb', line 146 def log @log ||= Logger.new(STDOUT) end |
#publish(message, headers = {}) ⇒ Object Also known as: push
Publish a message to a queue
62 63 64 65 |
# File 'lib/queue_worker.rb', line 62 def publish(, headers = {}) = .to_json unless .is_a?(String) client.publish("/queue/#{queue}", , { priority: 4, persistent: true }.merge(headers)) end |
#quit ⇒ Object
Unsubscribe from the current queue and close the connection
98 99 100 101 |
# File 'lib/queue_worker.rb', line 98 def quit unsubscribe close end |
#subscribe(size = 1, &block) ⇒ Object
Subscribe (listen) to a queue
73 74 75 76 |
# File 'lib/queue_worker.rb', line 73 def subscribe(size = 1, &block) callback = block || method(:default_subscribe_callback) client.subscribe("/queue/#{queue}", { :ack => 'client', 'activemq.prefetchSize' => size }, &callback) end |
#subscribe_with_timeout(duration, size = 1, &block) ⇒ Object
Subscribe to a queue for a limited time
83 84 85 86 87 88 89 90 |
# File 'lib/queue_worker.rb', line 83 def subscribe_with_timeout(duration, size = 1, &block) Timeout::timeout(duration) do subscribe(size, &block) join end rescue Timeout::Error quit end |
#unsubscribe ⇒ Object
Unsubscribe from the current queue
93 94 95 |
# File 'lib/queue_worker.rb', line 93 def unsubscribe client.unsubscribe("/queue/#{queue}") end |