Class: RServiceBus2::Host
- Inherits:
-
Object
- Object
- RServiceBus2::Host
- Defined in:
- lib/rservicebus2/host.rb
Overview
Host process for rservicebus
Instance Attribute Summary collapse
-
#saga_data ⇒ Object
Returns the value of attribute saga_data.
Instance Method Summary collapse
-
#_send_already_wrapped_and_serialised(serialized_object, queue_name) ⇒ Object
Sends a msg across the bus.
-
#_send_needs_wrapping(msg, queue_name, correlation_id) ⇒ Object
Sends a msg across the bus.
-
#configure_app_resource ⇒ Object
Thin veneer for Configuring external resources.
-
#configure_saga_storage ⇒ Object
Thin veneer for Configuring state.
-
#configure_subscriptions ⇒ Object
Load, configure and initialise Subscriptions.
- #get_endpoint_for_msg(msg_name) ⇒ Object
-
#handle_message ⇒ Object
Send the current msg to the appropriate handlers.
-
#initialize ⇒ Host
constructor
A new instance of Host.
-
#load_contracts ⇒ Object
Load Contracts.
-
#load_handlers ⇒ Object
Load and configure Message Handlers.
-
#load_libs ⇒ Object
For each directory given, find and load all librarys.
-
#load_sagas ⇒ Object
Load and configure Sagas.
-
#log(string) ⇒ Object
Provides a thin logging veneer.
-
#publish(msg) ⇒ Object
Sends an event to all subscribers across the bus.
- #queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil) ⇒ Object
-
#reply(msg, timestamp = nil) ⇒ Object
Sends a msg back across the bus Reply queues are specified in each msg.
-
#run ⇒ Object
Ignition.
-
#send(msg, timestamp = nil) ⇒ Object
Send a msg across the bus msg destination is specified at the infrastructure level.
- #send_queued_msgs ⇒ Object
-
#send_subscriptions ⇒ Object
Subscriptions are specified by adding events to the msg endpoint mapping.
-
#start_listening_to_endpoints ⇒ Object
Receive a msg, prep it, and handle any errors that may occur - Most of this should be queue independant.
-
#subscribe(event_name) ⇒ Object
Sends a subscription request across the Bus.
Constructor Details
#initialize ⇒ Host
Returns a new instance of Host.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/rservicebus2/host.rb', line 100 def initialize RServiceBus2.rlog "Current directory: #{Dir.pwd}" YamlSafeLoader.instance @config = ConfigFromEnv.new .load_host_section .load_contracts .load_handler_path_list .load_saga_path_list .load_libs .load_working_dir_list @mq = MQ.get @endpoint_mapping = EndpointMapping.new.configure(@mq.local_queue_name) @stats = StatisticManager.new(self) load_contracts load_libs @send_at_manager = SendAtManager.new(self) @state_manager = StateManager.new configure_saga_storage configure_app_resource @circuit_breaker = CircuitBreaker.new(self) @monitors = ConfigureMonitor.new(self, @resource_manager).monitors(ENV) load_handlers load_sagas @cron_manager = CronManager.new(self, @handler_manager.msg_names) configure_subscriptions send_subscriptions end |
Instance Attribute Details
#saga_data ⇒ Object
Returns the value of attribute saga_data.
18 19 20 |
# File 'lib/rservicebus2/host.rb', line 18 def saga_data @saga_data end |
Instance Method Details
#_send_already_wrapped_and_serialised(serialized_object, queue_name) ⇒ Object
Sends a msg across the bus
335 336 337 338 339 340 341 |
# File 'lib/rservicebus2/host.rb', line 335 def _send_already_wrapped_and_serialised(serialized_object, queue_name) RServiceBus2.rlog 'Bus._send_already_wrapped_and_serialised' @mq.send(@config., serialized_object) unless @config..nil? @mq.send(queue_name, serialized_object) end |
#_send_needs_wrapping(msg, queue_name, correlation_id) ⇒ Object
Sends a msg across the bus
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/rservicebus2/host.rb', line 346 def _send_needs_wrapping(msg, queue_name, correlation_id) RServiceBus2.rlog 'Bus._send_needs_wrapping' r_msg = RServiceBus2::Message.new(msg, @mq.local_queue_name, correlation_id) if queue_name.index('@').nil? q = queue_name RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}" else parts = queue_name.split('@') r_msg.remote_queue_name = parts[0] r_msg.remote_host_name = parts[1] q = 'transport-out' RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}, via #{q}" end serialized_object = YAML.dump(r_msg) _send_already_wrapped_and_serialised(serialized_object, q) end |
#configure_app_resource ⇒ Object
Thin veneer for Configuring external resources
27 28 29 30 31 32 33 |
# File 'lib/rservicebus2/host.rb', line 27 def configure_app_resource @resource_manager = ConfigureAppResource.new .get_resources(ENV, self, @state_manager, @saga_storage) end |
#configure_saga_storage ⇒ Object
Thin veneer for Configuring state
36 37 38 39 40 41 42 |
# File 'lib/rservicebus2/host.rb', line 36 def configure_saga_storage string = RServiceBus2.get_value('SAGA_URI', 'dir:///tmp/saga') # string = 'dir:///tmp' if string.nil? uri = URI.parse(string) @saga_storage = SagaStorage.get(uri) end |
#configure_subscriptions ⇒ Object
Load, configure and initialise Subscriptions
95 96 97 98 |
# File 'lib/rservicebus2/host.rb', line 95 def configure_subscriptions subscription_storage = ConfigureSubscriptionStorage.new.get(@config.app_name, @config.subscription_uri) @subscription_manager = SubscriptionManager.new(subscription_storage) end |
#get_endpoint_for_msg(msg_name) ⇒ Object
393 394 395 396 397 398 399 400 401 402 |
# File 'lib/rservicebus2/host.rb', line 393 def get_endpoint_for_msg(msg_name) queue_name = @endpoint_mapping.get(msg_name) return queue_name unless queue_name.nil? return @mq.local_queue_name if @handler_manager.can_msg_be_handled_locally(msg_name) log "No end point mapping found for: #{msg_name}" log "**** Check environment variable MessageEndpointMappings contains an entry named: #{msg_name}" raise "No end point mapping found for: #{msg_name}" end |
#handle_message ⇒ Object
Send the current msg to the appropriate handlers
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/rservicebus2/host.rb', line 295 def @resource_manager.begin msg_name = @msg.msg.class.name handler_list = @handler_manager.get_handler_list_for_msg(msg_name) RServiceBus2.rlog "Handler found for: #{msg_name}" begin @queue_for_msgs_to_be_sent_on_complete = [] log "Started processing msg, #{msg_name}" handler_list.each do |handler| log "Handler, #{handler.class.name}, Started" handler.handle(@msg.msg) log "Handler, #{handler.class.name}, Finished" rescue PropertyNotSet => e raise PropertyNotSet, "Property, #{e.}, not set for, #{handler.class.name}" rescue StandardError => e puts "E #{e.}" log "An error occurred in Handler: #{handler.class.name}" raise e end raise NoHandlerFound, msg_name if @saga_manager.handle(@msg) == false && handler_list.length.zero? @resource_manager.commit(msg_name) send_queued_msgs log "Finished processing msg, #{msg_name}" rescue StandardError => e @resource_manager.rollback(msg_name) @queue_for_msgs_to_be_sent_on_complete = nil raise e end end |
#load_contracts ⇒ Object
Load Contracts
77 78 79 80 81 82 83 84 |
# File 'lib/rservicebus2/host.rb', line 77 def load_contracts log 'Load Contracts' @config.contract_list.each do |path| require path RServiceBus2.rlog "Loaded Contract: #{path}" end end |
#load_handlers ⇒ Object
Load and configure Message Handlers
55 56 57 58 59 60 61 62 63 |
# File 'lib/rservicebus2/host.rb', line 55 def load_handlers log 'Load Message Handlers' @handler_manager = HandlerManager.new(self, @resource_manager, @state_manager) @handler_loader = HandlerLoader.new(self, @handler_manager) @config.handler_path_list.each do |path| @handler_loader.load_handlers_from_path(path) end end |
#load_libs ⇒ Object
For each directory given, find and load all librarys
87 88 89 90 91 92 |
# File 'lib/rservicebus2/host.rb', line 87 def load_libs log 'Load Libs' @config.lib_list.each do |path| $LOAD_PATH.unshift path end end |
#load_sagas ⇒ Object
Load and configure Sagas
66 67 68 69 70 71 72 73 74 |
# File 'lib/rservicebus2/host.rb', line 66 def load_sagas log 'Load Sagas' @saga_manager = SagaManager.new(self, @resource_manager, @saga_storage) @saga_loader = SagaLoader.new(self, @saga_manager) @config.saga_path_list.each do |path| @saga_loader.load_sagas_from_path(path) end end |
#log(string) ⇒ Object
Provides a thin logging veneer
22 23 24 |
# File 'lib/rservicebus2/host.rb', line 22 def log(string) RServiceBus2.log(string) end |
#publish(msg) ⇒ Object
Sends an event to all subscribers across the bus
418 419 420 421 422 423 424 425 426 |
# File 'lib/rservicebus2/host.rb', line 418 def publish(msg) RServiceBus2.rlog 'Bus.Publish' @stats.inc_total_published subscriptions = @subscription_manager.get(msg.class.name) subscriptions.each do |subscriber| queue_msg_for_send_on_complete(msg, subscriber) end end |
#queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil) ⇒ Object
375 376 377 378 379 380 |
# File 'lib/rservicebus2/host.rb', line 375 def queue_msg_for_send_on_complete(msg, queue_name, = nil) correlation_id = @saga_data.nil? ? nil : @saga_data.correlation_id correlation_id = !@msg.nil? && !@msg.correlation_id.nil? ? @msg.correlation_id : correlation_id @queue_for_msgs_to_be_sent_on_complete << Hash['msg', msg, 'queue_name', queue_name, 'correlation_id', correlation_id, 'timestamp', ] end |
#reply(msg, timestamp = nil) ⇒ Object
Sends a msg back across the bus Reply queues are specified in each msg. It works like email, where the reply address can actually be anywhere
386 387 388 389 390 391 |
# File 'lib/rservicebus2/host.rb', line 386 def reply(msg, = nil) RServiceBus2.rlog "Reply with: #{msg.class.name} To: #{@msg.return_address}" @stats.inc_total_reply queue_msg_for_send_on_complete(msg, @msg.return_address, ) end |
#run ⇒ Object
Ignition
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/rservicebus2/host.rb', line 132 def run log 'Starting the Host' log "Watching, #{@mq.local_queue_name}" $0 = "rservicebus - #{@mq.local_queue_name}" unless @config..nil? log "Forwarding all received messages to: #{@config.}" end unless @config..nil? log "Forwarding all sent messages to: #{@config.}" end start_listening_to_endpoints end |
#send(msg, timestamp = nil) ⇒ Object
Send a msg across the bus msg destination is specified at the infrastructure level
407 408 409 410 411 412 413 414 |
# File 'lib/rservicebus2/host.rb', line 407 def send(msg, = nil) RServiceBus2.rlog 'Bus.Send' @stats.inc_total_sent msg_name = msg.class.name queue_name = get_endpoint_for_msg(msg_name) queue_msg_for_send_on_complete(msg, queue_name, ) end |
#send_queued_msgs ⇒ Object
365 366 367 368 369 370 371 372 373 |
# File 'lib/rservicebus2/host.rb', line 365 def send_queued_msgs @queue_for_msgs_to_be_sent_on_complete.each do |row| if row['timestamp'].nil? _send_needs_wrapping(row['msg'], row['queue_name'], row['correlation_id']) else @send_at_manager.add(row) end end end |
#send_subscriptions ⇒ Object
Subscriptions are specified by adding events to the msg endpoint mapping
46 47 48 49 50 51 52 |
# File 'lib/rservicebus2/host.rb', line 46 def send_subscriptions log 'Send Subscriptions' @endpoint_mapping.subscription_endpoints.each do |event_name| subscribe(event_name) end end |
#start_listening_to_endpoints ⇒ Object
Receive a msg, prep it, and handle any errors that may occur
-
Most of this should be queue independant
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/rservicebus2/host.rb', line 148 def start_listening_to_endpoints log 'Waiting for messages. To exit press CTRL+C' = true retries = @config.max_retries while # Popping a msg off the queue should not be in the message handler, # as it affects retry begin @stats.tick if @circuit_breaker.broken sleep 0.5 next end body = @mq.pop begin @stats.inc_total_processed @msg = RServiceBus2.safe_load(body) case @msg.msg.class.name when 'RServiceBus2::MessageSubscription' @subscription_manager.add(@msg.msg.event_name, @msg.return_address) when 'RServiceBus2::MessageStatisticOutputOn' @stats.output = true log 'Turn on Stats logging' when 'RServiceBus2::MessageStatisticOutputOff' @stats.output = false log 'Turn off Stats logging' when 'RServiceBus2::MessageVerboseOutputOn' ENV['VERBOSE'] = 'true' log 'Turn on Verbose logging' when 'RServiceBus2::MessageVerboseOutputOff' ENV.delete('VERBOSE') log 'Turn off Verbose logging' else unless @config..nil? _send_already_wrapped_and_serialised(body, @config.) end end @mq.ack rescue ClassNotFoundForMsg => e puts "*** Class not found for msg, #{e.}" puts "*** Ensure, #{e.}, is defined in contract.rb, most likely as 'Class #{e.} end" @msg.add_error_msg(@mq.local_queue_name, e.) serialized_object = YAML.dump(@msg) _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name) @mq.ack rescue NoHandlerFound => e puts "*** Handler not found for msg, #{e.}" puts "*** Ensure a handler named, #{e.}, is present in the messagehandler directory." @msg.add_error_msg(@mq.local_queue_name, e.) serialized_object = YAML.dump(@msg) _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name) @mq.ack rescue PropertyNotSet => e # This has been re-rasied from a rescue in the handler puts "*** #{e.}" # "Property, #{e.message}, not set for, #{handler.class.name}" property_name = e.[10, e..index(',', 10) - 10] puts "*** Ensure the environment variable, RSB_#{property_name}, has been set at startup." rescue StandardError => e sleep 0.5 puts '*** Exception occurred' puts e. puts e.backtrace puts '***' if retries.positive? retries -= 1 @mq.return_to_queue else @circuit_breaker.failure @stats.inc_total_errored if e.class.name == 'Beanstalk::NotConnected' puts 'Lost connection to beanstalkd.' puts '*** Start or Restart beanstalkd and try again.' abort end if e.class.name == 'Redis::CannotConnectError' puts 'Lost connection to redis.' puts '*** Start or Restart redis and try again.' abort end error_string = "#{e.}. #{e.backtrace.join('. ')}" @msg.add_error_msg(@mq.local_queue_name, error_string) serialized_object = YAML.dump(@msg) _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name) @mq.ack retries = @config.max_retries end end rescue SystemExit, Interrupt puts 'Exiting on request ...' = false rescue NoMsgToProcess => e # This exception is just saying there are no messages to process begin @queue_for_msgs_to_be_sent_on_complete = [] @monitors.each(&:look) send_queued_msgs @queue_for_msgs_to_be_sent_on_complete = nil @queue_for_msgs_to_be_sent_on_complete = [] @cron_manager.run send_queued_msgs @queue_for_msgs_to_be_sent_on_complete = nil @send_at_manager.process @circuit_breaker.success rescue StandardError => e if e. == 'SIGTERM' || e. == 'SIGINT' puts 'Exiting on request ...' = false else puts '*** This is really unexpected.' = false puts "Message: #{e.}" puts e.backtrace end end rescue StandardError => e if e. == 'SIGTERM' || e. == 'SIGINT' puts 'Exiting on request ...' = false else puts '*** This is really unexpected.' = false puts "Message: #{e.}" puts e.backtrace end end end end |
#subscribe(event_name) ⇒ Object
Sends a subscription request across the Bus
430 431 432 433 434 435 436 437 |
# File 'lib/rservicebus2/host.rb', line 430 def subscribe(event_name) RServiceBus2.rlog "Bus.Subscribe: #{event_name}" queue_name = get_endpoint_for_msg(event_name) subscription = MessageSubscription.new(event_name) _send_needs_wrapping(subscription, queue_name, nil) end |