Class: Stapfen::Worker
- Inherits:
-
Object
- Object
- Stapfen::Worker
- Defined in:
- lib/stapfen/worker.rb
Constant Summary collapse
- KAFKA =
:kafka.freeze
- STOMP =
:stomp.freeze
- JMS =
:jms.freeze
Class Attribute Summary collapse
-
.consumers ⇒ Object
Returns the value of attribute consumers.
-
.destructor ⇒ Object
Returns the value of attribute destructor.
-
.instance_configuration ⇒ Object
Returns the value of attribute instance_configuration.
Instance Attribute Summary collapse
-
#client_options ⇒ Object
Returns the value of attribute client_options.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#protocol ⇒ Object
Returns the value of attribute protocol.
-
#stapfen_client ⇒ Object
Returns the value of attribute stapfen_client.
Class Method Summary collapse
- .configure(&configuration_block) ⇒ Object
-
.consume(config_overrides = {}, &consume_block) ⇒ Object
Main message consumption block.
-
.exit_cleanly ⇒ Boolean
Invoke
exit_cleanly
on each of the registered Worker instances that this class is keeping track of. -
.handle_signals ⇒ Object
Utility method to set up the proper worker signal handlers.
-
.run! ⇒ Object
Instantiate a new
Worker
instance and run it. -
.set_class_variable_defaults ⇒ Object
Class variables are put in this method to allow for “reset” style functionality if needed.
-
.shutdown(&block) ⇒ Object
Optional method, specifes a block to execute when the worker is shutting down.
-
.workers ⇒ Object
Return all the currently running Stapfen::Worker instances in this process.
Instance Method Summary collapse
- #configure(&configuration_block) ⇒ Object
-
#exit_cleanly ⇒ Object
Invokes the shutdown block if it has been created, and closes the {Stomp{Stomp::Client} connection unless it has already been shut down.
-
#initialize ⇒ Worker
constructor
Instance Methods.
- #jms? ⇒ Boolean
- #kafka? ⇒ Boolean
- #run ⇒ Object
- #stomp? ⇒ Boolean
-
#use_jms! ⇒ Boolean
Force the worker to use JMS as the messaging protocol.
-
#use_kafka! ⇒ Boolean
Force the worker to use Kafka as the messaging protocol.
-
#use_stomp! ⇒ Boolean
Force the worker to use STOMP as the messaging protocol (default).
Constructor Details
#initialize ⇒ Worker
Instance Methods
110 111 112 113 114 115 116 |
# File 'lib/stapfen/worker.rb', line 110 def initialize instance_configuration = self.class.instance_configuration if instance_configuration self.configure &instance_configuration end self. ||= {} end |
Class Attribute Details
.consumers ⇒ Object
Returns the value of attribute consumers.
13 14 15 |
# File 'lib/stapfen/worker.rb', line 13 def consumers @consumers end |
.destructor ⇒ Object
Returns the value of attribute destructor.
13 14 15 |
# File 'lib/stapfen/worker.rb', line 13 def destructor @destructor end |
.instance_configuration ⇒ Object
Returns the value of attribute instance_configuration.
13 14 15 |
# File 'lib/stapfen/worker.rb', line 13 def instance_configuration @instance_configuration end |
Instance Attribute Details
#client_options ⇒ Object
Returns the value of attribute client_options.
10 11 12 |
# File 'lib/stapfen/worker.rb', line 10 def @client_options end |
#logger ⇒ Object
Returns the value of attribute logger.
10 11 12 |
# File 'lib/stapfen/worker.rb', line 10 def logger @logger end |
#protocol ⇒ Object
Returns the value of attribute protocol.
10 11 12 |
# File 'lib/stapfen/worker.rb', line 10 def protocol @protocol end |
#stapfen_client ⇒ Object
Returns the value of attribute stapfen_client.
10 11 12 |
# File 'lib/stapfen/worker.rb', line 10 def stapfen_client @stapfen_client end |
Class Method Details
.configure(&configuration_block) ⇒ Object
15 16 17 18 19 20 |
# File 'lib/stapfen/worker.rb', line 15 def configure(&configuration_block) unless block_given? raise Stapfen::ConfigurationError, "Method `configure` requires a block" end self.instance_configuration = configuration_block end |
.consume(config_overrides = {}, &consume_block) ⇒ Object
Main message consumption block
34 35 36 37 38 39 40 |
# File 'lib/stapfen/worker.rb', line 34 def consume(config_overrides={}, &consume_block) unless block_given? raise Stapfen::ConsumeError, "Method `consume` requires a block" end @consumers ||= ThreadSafe::Array.new @consumers << [config_overrides, consume_block] end |
.exit_cleanly ⇒ Boolean
Invoke exit_cleanly
on each of the registered Worker instances that this class is keeping track of
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/stapfen/worker.rb', line 58 def exit_cleanly return false if workers.empty? cleanly = true workers.each do |w| begin w.exit_cleanly rescue StandardError => ex $stderr.write("Failure while exiting cleanly #{ex.inspect}\n#{ex.backtrace}") cleanly = false end end if RUBY_PLATFORM == 'java' Stapfen.logger.info 'Telling the JVM to exit cleanly' Java::JavaLang::System.exit(0) end return cleanly end |
.handle_signals ⇒ Object
Utility method to set up the proper worker signal handlers
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/stapfen/worker.rb', line 80 def handle_signals return if @@signals_handled Signal.trap(:INT) do self.exit_cleanly exit! end Signal.trap(:TERM) do self.exit_cleanly end @@signals_handled = true end |
.run! ⇒ Object
Instantiate a new Worker
instance and run it
23 24 25 26 27 28 29 30 31 |
# File 'lib/stapfen/worker.rb', line 23 def run! worker = self.new @@workers << worker handle_signals worker.run end |
.set_class_variable_defaults ⇒ Object
Class variables are put in this method to allow for “reset” style functionality if needed. Useful for testing (see worker_spec.rb).
97 98 99 100 |
# File 'lib/stapfen/worker.rb', line 97 def set_class_variable_defaults @@signals_handled = false @@workers = ThreadSafe::Array.new end |
.shutdown(&block) ⇒ Object
Optional method, specifes a block to execute when the worker is shutting down.
44 45 46 |
# File 'lib/stapfen/worker.rb', line 44 def shutdown(&block) @destructor = block end |
.workers ⇒ Object
Return all the currently running Stapfen::Worker instances in this process
50 51 52 |
# File 'lib/stapfen/worker.rb', line 50 def workers @@workers end |
Instance Method Details
#configure(&configuration_block) ⇒ Object
118 119 120 |
# File 'lib/stapfen/worker.rb', line 118 def configure(&configuration_block) self.instance_eval &configuration_block end |
#exit_cleanly ⇒ Object
Invokes the shutdown block if it has been created, and closes the Stapfen::Worker.{Stomp{Stomp::Client} connection unless it has already been shut down
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/stapfen/worker.rb', line 265 def exit_cleanly logger.info("#{self} exiting ") self.class.destructor.call if self.class.destructor logger.info 'Killing client' begin # Only close the client if we have one sitting around if stapfen_client unless stapfen_client.closed? stapfen_client.close end end rescue StandardError => exc logger.error "Exception received while trying to close client! #{exc.inspect}" end end |
#kafka? ⇒ Boolean
193 194 195 |
# File 'lib/stapfen/worker.rb', line 193 def kafka? @protocol == KAFKA end |
#run ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/stapfen/worker.rb', line 201 def run case protocol when STOMP require 'stapfen/client/stomp' stapfen_client = Stapfen::Client::Stomp.new() when JMS require 'stapfen/client/jms' stapfen_client = Stapfen::Client::JMS.new() when KAFKA require 'stapfen/client/kafka' stapfen_client = Stapfen::Client::Kafka.new() else raise 'No client specified' end logger.info("Running with #{stapfen_client} inside of Thread:#{Thread.current.inspect}") stapfen_client.connect self.class.consumers.each do |config_overrides, block| consumer_config = .merge(config_overrides) consumer_topic = consumer_config[:topic] consumer_can_unreceive = !(consumer_config.keys & [:max_redeliveries, :dead_letter_queue]).empty? # We're taking each block and turning it into a method so that we can # use the instance scope instead of the blocks originally bound scope # which would be at a class level methodized_topic = consumer_topic.gsub(/[.|\-]/, '_').to_sym self.class.send(:define_method, methodized_topic, &block) stapfen_client.subscribe(consumer_topic, consumer_config) do || = nil if stomp? = Stapfen::Message.from_stomp() end if jms? = Stapfen::Message.from_jms() end if kafka? = Stapfen::Message.from_kafka() end success = self.send(methodized_topic, ) unless success if stapfen_client.can_unreceive? && consumer_can_unreceive stapfen_client.unreceive(, consumer_config) end end end end begin stapfen_client.runloop logger.info("Exiting the runloop for #{self}") rescue Interrupt exit_cleanly end end |
#stomp? ⇒ Boolean
141 142 143 |
# File 'lib/stapfen/worker.rb', line 141 def stomp? @protocol == STOMP end |
#use_jms! ⇒ Boolean
Force the worker to use JMS as the messaging protocol.
Note: Only works under JRuby
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/stapfen/worker.rb', line 150 def use_jms! unless RUBY_PLATFORM == 'java' raise Stapfen::ConfigurationError, 'You cannot use JMS unless you are running under JRuby!' end begin require 'java' require 'jms' rescue LoadError Stapfen.logger.info 'You need the `jms` gem to be installed to use JMS!' raise end @protocol = JMS return true end |
#use_kafka! ⇒ Boolean
Force the worker to use Kafka as the messaging protocol.
Note: Only works under JRuby
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/stapfen/worker.rb', line 176 def use_kafka! unless RUBY_PLATFORM == 'java' raise Stapfen::ConfigurationError, 'You cannot use Kafka unless you are running under JRuby!' end begin require 'java' require 'hermann' rescue LoadError Stapfen.logger.info 'You need the `hermann` gem to be installed to use Kafka!' raise end @protocol = KAFKA return true end |
#use_stomp! ⇒ Boolean
Force the worker to use STOMP as the messaging protocol (default)
129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/stapfen/worker.rb', line 129 def use_stomp! begin require 'stomp' rescue LoadError Stapfen.logger.info 'You need the `stomp` gem to be installed to use stomp!' raise end @protocol = STOMP return true end |