Class: RServiceBus2::Host
- Inherits:
-
Object
- Object
- RServiceBus2::Host
- Defined in:
- lib/rservicebus2/host.rb
Overview
Host process for rservicebus
Instance Attribute Summary collapse
-
#saga_data ⇒ Object
Returns the value of attribute saga_data.
Instance Method Summary collapse
-
#_send_already_wrapped_and_serialised(serialized_object, queue_name) ⇒ Object
Sends a msg across the bus.
-
#_send_needs_wrapping(msg, queue_name, correlation_id) ⇒ Object
Sends a msg across the bus.
-
#configure_app_resource ⇒ Object
Thin veneer for Configuring external resources.
-
#configure_circuit_breaker ⇒ Object
Thin veneer for Configuring Cron.
-
#configure_cron_manager ⇒ Object
Thin veneer for Configuring Cron.
-
#configure_monitors ⇒ Object
Thin veneer for Configuring external resources.
-
#configure_saga_storage ⇒ Object
Thin veneer for Configuring state.
-
#configure_send_at_manager ⇒ Object
Thin veneer for Configuring SendAt.
-
#configure_state_manager ⇒ Object
Thin veneer for Configuring state.
-
#configure_statistics ⇒ Object
Initialise statistics monitor.
-
#configure_subscriptions ⇒ Object
Load, configure and initialise Subscriptions.
-
#connect_to_mq ⇒ Object
Thin veneer for Configuring the Message Queue.
- #get_endpoint_for_msg(msg_name) ⇒ Object
-
#handle_message ⇒ Object
Send the current msg to the appropriate handlers.
-
#initialize ⇒ Host
constructor
A new instance of Host.
-
#load_contracts ⇒ Object
Load Contracts.
-
#load_handlers ⇒ Object
Load and configure Message Handlers.
-
#load_libs ⇒ Object
For each directory given, find and load all librarys.
-
#load_sagas ⇒ Object
Load and configure Sagas.
-
#log(string, ver = false) ⇒ Object
Provides a thin logging veneer.
-
#publish(msg) ⇒ Object
Sends an event to all subscribers across the bus.
- #queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil) ⇒ Object
-
#reply(msg) ⇒ Object
Sends a msg back across the bus Reply queues are specified in each msg.
-
#run ⇒ Object
Ignition.
-
#send(msg, timestamp = nil) ⇒ Object
Send a msg across the bus msg destination is specified at the infrastructure level.
- #send_queued_msgs ⇒ Object
-
#send_subscriptions ⇒ Object
Subscriptions are specified by adding events to the msg endpoint mapping.
-
#start_listening_to_endpoints ⇒ Object
Receive a msg, prep it, and handle any errors that may occur - Most of this should be queue independant.
-
#subscribe(event_name) ⇒ Object
Sends a subscription request across the Bus.
Constructor Details
#initialize ⇒ Host
Returns a new instance of Host.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/rservicebus2/host.rb', line 151 def initialize RServiceBus2.rlog "Current directory: #{Dir.pwd}" @config = ConfigFromEnv.new.load_host_section .load_contracts .load_handler_path_list .load_saga_path_list .load_libs .load_working_dir_list connect_to_mq @endpoint_mapping = EndpointMapping.new.configure(@mq.local_queue_name) self.configure_statistics .load_contracts .load_libs .configure_send_at_manager .configure_state_manager .configure_saga_storage .configure_app_resource .configure_circuit_breaker .configure_monitors .load_handlers .load_sagas .configure_cron_manager .configure_subscriptions .send_subscriptions self end |
Instance Attribute Details
#saga_data ⇒ Object
Returns the value of attribute saga_data.
13 14 15 |
# File 'lib/rservicebus2/host.rb', line 13 def saga_data @saga_data end |
Instance Method Details
#_send_already_wrapped_and_serialised(serialized_object, queue_name) ⇒ Object
Sends a msg across the bus
381 382 383 384 385 386 387 388 389 |
# File 'lib/rservicebus2/host.rb', line 381 def _send_already_wrapped_and_serialised(serialized_object, queue_name) RServiceBus2.rlog 'Bus._send_already_wrapped_and_serialised' unless @config..nil? @mq.send(@config., serialized_object) end @mq.send(queue_name, serialized_object) end |
#_send_needs_wrapping(msg, queue_name, correlation_id) ⇒ Object
Sends a msg across the bus
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/rservicebus2/host.rb', line 394 def _send_needs_wrapping(msg, queue_name, correlation_id) RServiceBus2.rlog 'Bus._send_needs_wrapping' r_msg = RServiceBus2::Message.new(msg, @mq.local_queue_name, correlation_id) if queue_name.index('@').nil? q = queue_name RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}" else parts = queue_name.split('@') r_msg.set_remote_queue_name(parts[0]) r_msg.set_remote_host_name(parts[1]) q = 'transport-out' RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}, via #{q}" end serialized_object = YAML.dump(r_msg) _send_already_wrapped_and_serialised(serialized_object, q) end |
#configure_app_resource ⇒ Object
Thin veneer for Configuring external resources
23 24 25 26 27 28 29 30 |
# File 'lib/rservicebus2/host.rb', line 23 def configure_app_resource @resource_manager = ConfigureAppResource.new .get_resources(ENV, self, @state_manager, @saga_storage) self end |
#configure_circuit_breaker ⇒ Object
Thin veneer for Configuring Cron
55 56 57 58 |
# File 'lib/rservicebus2/host.rb', line 55 def configure_circuit_breaker @circuit_breaker = CircuitBreaker.new(self) self end |
#configure_cron_manager ⇒ Object
Thin veneer for Configuring Cron
111 112 113 114 |
# File 'lib/rservicebus2/host.rb', line 111 def configure_cron_manager @cron_manager = CronManager.new(self, @handler_manager.get_list_of_msg_names) self end |
#configure_monitors ⇒ Object
Thin veneer for Configuring external resources
61 62 63 64 |
# File 'lib/rservicebus2/host.rb', line 61 def configure_monitors @monitors = ConfigureMonitor.new(self, @resource_manager).get_monitors(ENV) self end |
#configure_saga_storage ⇒ Object
Thin veneer for Configuring state
45 46 47 48 49 50 51 52 |
# File 'lib/rservicebus2/host.rb', line 45 def configure_saga_storage string = RServiceBus2.get_value('SAGA_URI') string = 'dir:///tmp' if string.nil? uri = URI.parse(string) @saga_storage = SagaStorage.get(uri) self end |
#configure_send_at_manager ⇒ Object
Thin veneer for Configuring SendAt
33 34 35 36 |
# File 'lib/rservicebus2/host.rb', line 33 def configure_send_at_manager @send_at_manager = SendAtManager.new(self) self end |
#configure_state_manager ⇒ Object
Thin veneer for Configuring state
39 40 41 42 |
# File 'lib/rservicebus2/host.rb', line 39 def configure_state_manager @state_manager = StateManager.new self end |
#configure_statistics ⇒ Object
Initialise statistics monitor
146 147 148 149 |
# File 'lib/rservicebus2/host.rb', line 146 def configure_statistics @stats = StatisticManager.new( self ) self end |
#configure_subscriptions ⇒ Object
Load, configure and initialise Subscriptions
139 140 141 142 143 |
# File 'lib/rservicebus2/host.rb', line 139 def configure_subscriptions subscription_storage = ConfigureSubscriptionStorage.new.get(@config.app_name, @config.subscription_uri) @subscription_manager = SubscriptionManager.new(subscription_storage) self end |
#connect_to_mq ⇒ Object
Thin veneer for Configuring the Message Queue
67 68 69 70 |
# File 'lib/rservicebus2/host.rb', line 67 def connect_to_mq @mq = MQ.get self end |
#get_endpoint_for_msg(msg_name) ⇒ Object
440 441 442 443 444 445 446 447 448 449 |
# File 'lib/rservicebus2/host.rb', line 440 def get_endpoint_for_msg(msg_name) queue_name = @endpoint_mapping.get(msg_name) return queue_name unless queue_name.nil? return @mq.local_queue_name if @handler_manager.can_msg_be_handled_locally(msg_name) log 'No end point mapping found for: ' + msg_name log '**** Check environment variable MessageEndpointMappings contains an entry named : ' + msg_name raise 'No end point mapping found for: ' + msg_name end |
#handle_message ⇒ Object
Send the current msg to the appropriate handlers
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/rservicebus2/host.rb', line 337 def @resource_manager.begin msg_name = @msg.msg.class.name handler_list = @handler_manager.get_handler_list_for_msg(msg_name) RServiceBus2.rlog 'Handler found for: ' + msg_name begin @queue_for_msgs_to_be_sent_on_complete = [] log "Started processing msg, #{msg_name}" handler_list.each do |handler| begin log "Handler, #{handler.class.name}, Started" handler.handle(@msg.msg) log "Handler, #{handler.class.name}, Finished" rescue PropertyNotSet => e raise PropertyNotSet.new( "Property, #{e.}, not set for, #{handler.class.name}" ) rescue StandardError => e puts "E #{e.}" log 'An error occurred in Handler: ' + handler.class.name raise e end end if @saga_manager.handle(@msg) == false && handler_list.length == 0 fail NoHandlerFound, msg_name end @resource_manager.commit(msg_name) send_queued_msgs log "Finished processing msg, #{msg_name}" rescue StandardError => e @resource_manager.rollback(msg_name) @queue_for_msgs_to_be_sent_on_complete = nil raise e end end |
#load_contracts ⇒ Object
Load Contracts
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/rservicebus2/host.rb', line 117 def load_contracts log 'Load Contracts' @config.contract_list.each do |path| require path RServiceBus2.rlog "Loaded Contract: #{path}" end self end |
#load_handlers ⇒ Object
Load and configure Message Handlers
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rservicebus2/host.rb', line 85 def load_handlers log 'Load Message Handlers' @handler_manager = HandlerManager.new(self, @resource_manager, @state_manager) @handler_loader = HandlerLoader.new(self, @handler_manager) @config.handler_path_list.each do |path| @handler_loader.load_handlers_from_path(path) end self end |
#load_libs ⇒ Object
For each directory given, find and load all librarys
129 130 131 132 133 134 135 136 |
# File 'lib/rservicebus2/host.rb', line 129 def load_libs log 'Load Libs' @config.lib_list.each do |path| $:.unshift path end self end |
#load_sagas ⇒ Object
Load and configure Sagas
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/rservicebus2/host.rb', line 98 def load_sagas log 'Load Sagas' @saga_manager = SagaManager.new(self, @resource_manager, @saga_storage) @saga_loader = SagaLoader.new(self, @saga_manager) @config.saga_path_list.each do |path| @saga_loader.load_sagas_from_path(path) end self end |
#log(string, ver = false) ⇒ Object
Provides a thin logging veneer
18 19 20 |
# File 'lib/rservicebus2/host.rb', line 18 def log(string, ver = false) RServiceBus2.log(string, ver) end |
#publish(msg) ⇒ Object
Sends an event to all subscribers across the bus
466 467 468 469 470 471 472 473 474 |
# File 'lib/rservicebus2/host.rb', line 466 def publish(msg) RServiceBus2.rlog 'Bus.Publish' @stats.inc_total_published subscriptions = @subscription_manager.get(msg.class.name) subscriptions.each do |subscriber| queue_msg_for_send_on_complete(msg, subscriber) end end |
#queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil) ⇒ Object
423 424 425 426 427 |
# File 'lib/rservicebus2/host.rb', line 423 def queue_msg_for_send_on_complete(msg, queue_name, = nil) correlation_id = @saga_data.nil? ? nil : @saga_data.correlation_id correlation_id = (!@msg.nil? && !@msg.correlation_id.nil?) ? @msg.correlation_id : correlation_id @queue_for_msgs_to_be_sent_on_complete << Hash['msg', msg, 'queue_name', queue_name, 'correlation_id', correlation_id, 'timestamp', ] end |
#reply(msg) ⇒ Object
Sends a msg back across the bus Reply queues are specified in each msg. It works like email, where the reply address can actually be anywhere
433 434 435 436 437 438 |
# File 'lib/rservicebus2/host.rb', line 433 def reply(msg) RServiceBus2.rlog 'Reply with: ' + msg.class.name + ' To: ' + @msg.return_address @stats.inc_total_reply queue_msg_for_send_on_complete(msg, @msg.return_address) end |
#run ⇒ Object
Ignition
183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/rservicebus2/host.rb', line 183 def run log 'Starting the Host' log "Watching, #{@mq.local_queue_name}" $0 = "rservicebus - #{@mq.local_queue_name}" unless @config..nil? log 'Forwarding all received messages to: ' + @config..to_s end unless @config..nil? log 'Forwarding all sent messages to: ' + @config..to_s end start_listening_to_endpoints end |
#send(msg, timestamp = nil) ⇒ Object
Send a msg across the bus msg destination is specified at the infrastructure level
455 456 457 458 459 460 461 462 |
# File 'lib/rservicebus2/host.rb', line 455 def send( msg, =nil ) RServiceBus2.rlog 'Bus.Send' @stats.inc_total_sent msg_name = msg.class.name queue_name = self.get_endpoint_for_msg(msg_name) queue_msg_for_send_on_complete(msg, queue_name, ) end |
#send_queued_msgs ⇒ Object
413 414 415 416 417 418 419 420 421 |
# File 'lib/rservicebus2/host.rb', line 413 def send_queued_msgs @queue_for_msgs_to_be_sent_on_complete.each do |row| if row['timestamp'].nil? _send_needs_wrapping(row['msg'], row['queue_name'], row['correlation_id']) else @send_at_manager.add(row) end end end |
#send_subscriptions ⇒ Object
Subscriptions are specified by adding events to the msg endpoint mapping
74 75 76 77 78 79 80 81 82 |
# File 'lib/rservicebus2/host.rb', line 74 def send_subscriptions log 'Send Subscriptions' @endpoint_mapping.get_subscription_endpoints.each do |event_name| subscribe(event_name) end self end |
#start_listening_to_endpoints ⇒ Object
Receive a msg, prep it, and handle any errors that may occur
-
Most of this should be queue independant
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/rservicebus2/host.rb', line 199 def start_listening_to_endpoints log 'Waiting for messages. To exit press CTRL+C' = true retries = @config.max_retries while # Popping a msg off the queue should not be in the message handler, # as it affects retry begin @stats.tick if @circuit_breaker.broken sleep 0.5 next end body = @mq.pop begin @stats.inc_total_processed @msg = YAML.load(body) case @msg.msg.class.name when 'RServiceBus2::MessageSubscription' @subscription_manager.add(@msg.msg.event_name, @msg.return_address) when 'RServiceBus2::MessageStatisticOutputOn' @stats.output = true log 'Turn on Stats logging' when 'RServiceBus2::MessageStatisticOutputOff' @stats.output = false log 'Turn off Stats logging' when 'RServiceBus2::MessageVerboseOutputOn' ENV['VERBOSE'] = 'true' log 'Turn on Verbose logging' when 'RServiceBus2::MessageVerboseOutputOff' ENV.delete('VERBOSE') log 'Turn off Verbose logging' else unless @config..nil? _send_already_wrapped_and_serialised(body, @config.) end end @mq.ack rescue ClassNotFoundForMsg => e puts "*** Class not found for msg, #{e.}" puts "*** Ensure, #{e.}, is defined in contract.rb, most likely as 'Class #{e.} end" @msg.add_error_msg(@mq.local_queue_name, e.) serialized_object = YAML.dump(@msg) _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name) @mq.ack rescue NoHandlerFound => e puts "*** Handler not found for msg, #{e.}" puts "*** Ensure a handler named, #{e.}, is present in the messagehandler directory." @msg.add_error_msg(@mq.local_queue_name, e.) serialized_object = YAML.dump(@msg) _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name) @mq.ack rescue PropertyNotSet => e # This has been re-rasied from a rescue in the handler puts "*** #{e.}" # "Property, #{e.message}, not set for, #{handler.class.name}" property_name = e.[10, e..index(',', 10) - 10] puts "*** Ensure the environment variable, RSB_#{property_name}, has been set at startup." rescue StandardError => e sleep 0.5 puts '*** Exception occurred' puts e. puts e.backtrace puts '***' if retries > 0 retries -= 1 @mq.return_to_queue else @circuit_breaker.failure @stats.inc_total_errored if e.class.name == 'Beanstalk::NotConnected' puts 'Lost connection to beanstalkd.' puts '*** Start or Restart beanstalkd and try again.' abort end if e.class.name == 'Redis::CannotConnectError' puts 'Lost connection to redis.' puts '*** Start or Restart redis and try again.' abort end error_string = e. + '. ' + e.backtrace.join('. ') @msg.add_error_msg(@mq.local_queue_name, error_string) serialized_object = YAML.dump(@msg) _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name) @mq.ack retries = @config.max_retries end end rescue SystemExit, Interrupt puts 'Exiting on request ...' = false rescue NoMsgToProcess => e # This exception is just saying there are no messages to process @queue_for_msgs_to_be_sent_on_complete = [] @monitors.each(&:look) send_queued_msgs @queue_for_msgs_to_be_sent_on_complete = nil @queue_for_msgs_to_be_sent_on_complete = [] @cron_manager.run send_queued_msgs @queue_for_msgs_to_be_sent_on_complete = nil @send_at_manager.process @circuit_breaker.success rescue StandardError => e if e. == 'SIGTERM' || e. == 'SIGINT' puts 'Exiting on request ...' = false else puts '*** This is really unexpected.' = false puts 'Message: ' + e. puts e.backtrace end end end end |
#subscribe(event_name) ⇒ Object
Sends a subscription request across the Bus
478 479 480 481 482 483 484 485 |
# File 'lib/rservicebus2/host.rb', line 478 def subscribe(event_name) RServiceBus2.rlog 'Bus.Subscribe: ' + event_name queue_name = get_endpoint_for_msg(event_name) subscription = MessageSubscription.new(event_name) _send_needs_wrapping(subscription, queue_name, nil) end |