Class: Stapfen::Worker
Constant Summary
Constants included from Logger
Class Attribute Summary collapse
-
.configuration ⇒ Object
Returns the value of attribute configuration.
-
.consumers ⇒ Object
Returns the value of attribute consumers.
-
.destructor ⇒ Object
Returns the value of attribute destructor.
-
.logger ⇒ Object
Returns the value of attribute logger.
-
.workers ⇒ Object
Returns the value of attribute workers.
Instance Attribute Summary collapse
-
#client ⇒ Object
Instance Methods.
Class Method Summary collapse
-
.configure ⇒ Object
Expects a block to be passed which will yield the appropriate configuration for the Stomp gem.
-
.consume(queue_name, headers = {}, &block) ⇒ Object
Main message consumption block.
-
.handle_signals ⇒ Object
Utility method to set up the proper worker signal handlers.
-
.log ⇒ Object
Optional method, should be passed a block which will yield a {Logger} instance for the Stapfen worker to use.
-
.run! ⇒ Object
Instantiate a new
Workerinstance and run it. -
.shutdown(&block) ⇒ Object
Optional method, specifes a block to execute when the worker is shutting down.
Instance Method Summary collapse
-
#exit_cleanly ⇒ Object
Invokes the shutdown block if it has been created, and closes the {Stomp{Stomp::Client} connection unless it has already been shut down.
- #run ⇒ Object
Methods included from Logger
Class Attribute Details
.configuration ⇒ Object
Returns the value of attribute configuration.
9 10 11 |
# File 'lib/stapfen/worker.rb', line 9 def configuration @configuration end |
.consumers ⇒ Object
Returns the value of attribute consumers.
9 10 11 |
# File 'lib/stapfen/worker.rb', line 9 def consumers @consumers end |
.destructor ⇒ Object
Returns the value of attribute destructor.
9 10 11 |
# File 'lib/stapfen/worker.rb', line 9 def destructor @destructor end |
.logger ⇒ Object
Returns the value of attribute logger.
9 10 11 |
# File 'lib/stapfen/worker.rb', line 9 def logger @logger end |
.workers ⇒ Object
Returns the value of attribute workers.
10 11 12 |
# File 'lib/stapfen/worker.rb', line 10 def workers @workers end |
Instance Attribute Details
#client ⇒ Object
Instance Methods
81 82 83 |
# File 'lib/stapfen/worker.rb', line 81 def client @client end |
Class Method Details
.configure ⇒ Object
Expects a block to be passed which will yield the appropriate configuration for the Stomp gem. Whatever the block yields will be passed directly into the {Stomp{Stomp::Client{Stomp::Client#new} method
28 29 30 31 32 33 |
# File 'lib/stapfen/worker.rb', line 28 def self.configure unless block_given? raise Stapfen::ConfigurationError end @configuration = yield end |
.consume(queue_name, headers = {}, &block) ⇒ Object
Main message consumption block
42 43 44 45 46 47 48 |
# File 'lib/stapfen/worker.rb', line 42 def self.consume(queue_name, headers={}, &block) unless block_given? raise Stapfen::ConsumeError, "Cannot consume #{queue_name} without a block!" end @consumers ||= [] @consumers << [queue_name, headers, block] end |
.handle_signals ⇒ Object
Utility method to set up the proper worker signal handlers
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/stapfen/worker.rb', line 57 def self.handle_signals return if @signals_handled Signal.trap(:INT) do workers.each do |w| w.exit_cleanly end exit! end Signal.trap(:TERM) do workers.each do |w| w.exit_cleanly end end @signals_handled = true end |
.log ⇒ Object
Optional method, should be passed a block which will yield a {Logger} instance for the Stapfen worker to use
37 38 39 |
# File 'lib/stapfen/worker.rb', line 37 def self.log @logger = yield end |
.run! ⇒ Object
Instantiate a new Worker instance and run it
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/stapfen/worker.rb', line 14 def self.run! worker = self.new @workers ||= [] @workers << worker handle_signals worker.run end |
.shutdown(&block) ⇒ Object
Optional method, specifes a block to execute when the worker is shutting down.
52 53 54 |
# File 'lib/stapfen/worker.rb', line 52 def self.shutdown(&block) @destructor = block end |
Instance Method Details
#exit_cleanly ⇒ Object
Invokes the shutdown block if it has been created, and closes the Stapfen::Worker.{Stomp{Stomp::Client} connection unless it has already been shut down
122 123 124 125 126 127 128 |
# File 'lib/stapfen/worker.rb', line 122 def exit_cleanly self.class.destructor.call if self.class.destructor unless client.closed? client.close end end |
#run ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/stapfen/worker.rb', line 83 def run @client = Stomp::Client.new(self.class.configuration) debug("Running with #{@client} inside of Thread:#{Thread.current.object_id}") self.class.consumers.each do |name, headers, block| unreceive_headers = {} [:max_redeliveries, :dead_letter_queue].each do |sym| unreceive_headers[sym] = headers.delete(sym) if headers.has_key? sym end # We're taking each block and turning it into a method so that we can # use the instance scope instead of the blocks originally bound scope # which would be at a class level method_name = name.gsub(/[.|\-]/, '_').to_sym self.class.send(:define_method, method_name, &block) client.subscribe(name, headers) do || success = self.send(method_name, ) if !success && !unreceive_headers.empty? client.unreceive(, unreceive_headers) end end end begin # Performing this join/open loop to make sure that we don't # experience potential deadlocks between signal handlers who might # close the connection, and an infinite Client#join call while client.open? do client.join(1) end rescue Interrupt exit_cleanly end end |