Class: RServiceBus::Host
- Inherits:
-
Object
- Object
- RServiceBus::Host
- Defined in:
- lib/rservicebus/Host.rb
Overview
Host process for rservicebus
Instance Attribute Summary collapse
-
#sagaData ⇒ Object
Returns the value of attribute sagaData.
Instance Method Summary collapse
-
#_SendAlreadyWrappedAndSerialised(serialized_object, queueName) ⇒ Object
Sends a msg across the bus.
-
#_SendNeedsWrapping(msg, queueName, correlationId) ⇒ Object
Sends a msg across the bus.
-
#configureAppResource ⇒ Object
Thin veneer for Configuring external resources.
-
#configureCircuitBreaker ⇒ Object
Thin veneer for Configuring Cron.
-
#configureCronManager ⇒ Object
Thin veneer for Configuring Cron.
-
#configureMonitors ⇒ Object
Thin veneer for Configuring external resources.
-
#configureSagaStorage ⇒ Object
Thin veneer for Configuring state.
-
#configureSendAtManager ⇒ Object
Thin veneer for Configuring SendAt.
-
#configureStateManager ⇒ Object
Thin veneer for Configuring state.
-
#configureStatistics ⇒ Object
Initialise statistics monitor.
-
#configureSubscriptions ⇒ Object
Load, configure and initialise Subscriptions.
-
#connectToMq ⇒ Object
Thin veneer for Configuring the Message Queue.
- #getEndpointForMsg(msgName) ⇒ Object
-
#HandleMessage ⇒ Object
Send the current msg to the appropriate handlers.
-
#initialize ⇒ Host
constructor
A new instance of Host.
-
#loadContracts ⇒ Object
Load Contracts.
-
#loadHandlers ⇒ Object
Load and configure Message Handlers.
-
#loadLibs ⇒ Object
For each directory given, find and load all librarys.
-
#loadSagas ⇒ Object
Load and configure Sagas.
-
#log(string, ver = false) ⇒ Object
Provides a thin logging veneer.
-
#Publish(msg) ⇒ Object
Sends an event to all subscribers across the bus.
- #queueMsgForSendOnComplete(msg, queueName, timestamp = nil) ⇒ Object
-
#Reply(msg) ⇒ Object
Sends a msg back across the bus Reply queues are specified in each msg.
-
#run ⇒ Object
Ignition.
-
#Send(msg, timestamp = nil) ⇒ Object
Send a msg across the bus msg destination is specified at the infrastructure level.
- #sendQueuedMsgs ⇒ Object
-
#sendSubscriptions ⇒ Object
Subscriptions are specified by adding events to the msg endpoint mapping.
-
#StartListeningToEndpoints ⇒ Object
Receive a msg, prep it, and handle any errors that may occur - Most of this should be queue independant.
-
#Subscribe(eventName) ⇒ Object
Sends a subscription request across the Bus.
Constructor Details
#initialize ⇒ Host
Returns a new instance of Host.
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/rservicebus/Host.rb', line 190 def initialize() RServiceBus.rlog "Current directory: #{Dir.pwd}" @config = ConfigFromEnv.new .loadHostSection() .loadContracts() .loadHandlerPathList() .loadSagaPathList() .loadLibs() .loadWorkingDirList(); self.connectToMq() @endpointMapping = EndpointMapping.new.Configure( @mq.localQueueName ) self.configureStatistics() .loadContracts() .loadLibs() .configureSendAtManager() .configureStateManager() .configureSagaStorage() .configureAppResource() .configureCircuitBreaker() .configureMonitors() .loadHandlers() .loadSagas() .configureCronManager() .configureSubscriptions() .sendSubscriptions() return self end |
Instance Attribute Details
#sagaData ⇒ Object
Returns the value of attribute sagaData.
15 16 17 |
# File 'lib/rservicebus/Host.rb', line 15 def sagaData @sagaData end |
Instance Method Details
#_SendAlreadyWrappedAndSerialised(serialized_object, queueName) ⇒ Object
Sends a msg across the bus
445 446 447 448 449 450 451 452 453 |
# File 'lib/rservicebus/Host.rb', line 445 def _SendAlreadyWrappedAndSerialised( serialized_object, queueName ) RServiceBus.rlog 'Bus._SendAlreadyWrappedAndSerialised' unless @config.forwardSentMessagesTo.nil? then @mq.send(@config.forwardSentMessagesTo, serialized_object) end @mq.send( queueName, serialized_object ) end |
#_SendNeedsWrapping(msg, queueName, correlationId) ⇒ Object
Sends a msg across the bus
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 |
# File 'lib/rservicebus/Host.rb', line 459 def _SendNeedsWrapping( msg, queueName, correlationId ) RServiceBus.rlog 'Bus._SendNeedsWrapping' rMsg = RServiceBus::Message.new( msg, @mq.localQueueName, correlationId ) if queueName.index('@').nil? then q = queueName RServiceBus.rlog "Sending, #{msg.class.name} to, queueName" else parts = queueName.split('@') rMsg.setRemoteQueueName( parts[0] ) rMsg.setRemoteHostName( parts[1] ) q = 'transport-out' RServiceBus.rlog "Sending, #{msg.class.name} to, #{queueName}, via #{q}" end serialized_object = YAML::dump(rMsg) self._SendAlreadyWrappedAndSerialised( serialized_object, q ) end |
#configureAppResource ⇒ Object
Thin veneer for Configuring external resources
48 49 50 51 |
# File 'lib/rservicebus/Host.rb', line 48 def configureAppResource @resourceManager = ConfigureAppResource.new.getResources( ENV, self, @stateManager, @sagaStorage ) return self; end |
#configureCircuitBreaker ⇒ Object
Thin veneer for Configuring Cron
83 84 85 86 |
# File 'lib/rservicebus/Host.rb', line 83 def configureCircuitBreaker @circuitBreaker = CircuitBreaker.new( self ) return self; end |
#configureCronManager ⇒ Object
Thin veneer for Configuring Cron
143 144 145 146 |
# File 'lib/rservicebus/Host.rb', line 143 def configureCronManager @cronManager = CronManager.new( self, @handlerManager.getListOfMsgNames ) return self; end |
#configureMonitors ⇒ Object
Thin veneer for Configuring external resources
91 92 93 94 |
# File 'lib/rservicebus/Host.rb', line 91 def configureMonitors @monitors = ConfigureMonitor.new( self, @resourceManager ).getMonitors( ENV ) return self; end |
#configureSagaStorage ⇒ Object
Thin veneer for Configuring state
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/rservicebus/Host.rb', line 70 def configureSagaStorage string = RServiceBus.getValue('SAGA_URI') if string.nil? then string = 'dir:///tmp' end uri = URI.parse( string ) @sagaStorage = SagaStorage.Get( uri ) return self; end |
#configureSendAtManager ⇒ Object
Thin veneer for Configuring SendAt
56 57 58 59 |
# File 'lib/rservicebus/Host.rb', line 56 def configureSendAtManager @sendAtManager = SendAtManager.new( self ) return self; end |
#configureStateManager ⇒ Object
Thin veneer for Configuring state
63 64 65 66 |
# File 'lib/rservicebus/Host.rb', line 63 def configureStateManager @stateManager = StateManager.new return self; end |
#configureStatistics ⇒ Object
Initialise statistics monitor
184 185 186 187 188 |
# File 'lib/rservicebus/Host.rb', line 184 def configureStatistics @stats = StatisticManager.new( self ) return self end |
#configureSubscriptions ⇒ Object
Load, configure and initialise Subscriptions
175 176 177 178 179 180 |
# File 'lib/rservicebus/Host.rb', line 175 def configureSubscriptions subscriptionStorage = ConfigureSubscriptionStorage.new.get( @config.appName, @config.subscriptionUri ) @subscriptionManager = SubscriptionManager.new( subscriptionStorage ) return self end |
#connectToMq ⇒ Object
Thin veneer for Configuring the Message Queue
98 99 100 101 102 |
# File 'lib/rservicebus/Host.rb', line 98 def connectToMq @mq = MQ.get return self end |
#getEndpointForMsg(msgName) ⇒ Object
506 507 508 509 510 511 512 513 514 515 |
# File 'lib/rservicebus/Host.rb', line 506 def getEndpointForMsg( msgName ) queueName = @endpointMapping.get( msgName ) return queueName unless queueName.nil? return @mq.localQueueName if @handlerManager.canMsgBeHandledLocally(msgName) log 'No end point mapping found for: ' + msgName log '**** Check environment variable MessageEndpointMappings contains an entry named : ' + msgName raise 'No end point mapping found for: ' + msgName end |
#HandleMessage ⇒ Object
Send the current msg to the appropriate handlers
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/rservicebus/Host.rb', line 392 def HandleMessage() @resourceManager.Begin msgName = @msg.msg.class.name handlerList = @handlerManager.getHandlerListForMsg(msgName) RServiceBus.rlog 'Handler found for: ' + msgName begin @queueForMsgsToBeSentOnComplete = Array.new log "Started processing msg, #{msgName}" handlerList.each do |handler| begin log "Handler, #{handler.class.name}, Started" handler.Handle( @msg.msg ) log "Handler, #{handler.class.name}, Finished" rescue PropertyNotSet => e raise PropertyNotSet.new( "Property, #{e.}, not set for, #{handler.class.name}" ) rescue Exception => e puts "E #{e.}" log 'An error occurred in Handler: ' + handler.class.name raise e end end if @sagaManager.Handle( @msg ) == false && handlerList.length == 0 then raise NoHandlerFound.new( msgName ) end @resourceManager.Commit( msgName ) self.sendQueuedMsgs log "Finished processing msg, #{msgName}" rescue Exception => e @resourceManager.Rollback( msgName ) @queueForMsgsToBeSentOnComplete = nil raise e end end |
#loadContracts ⇒ Object
Load Contracts
150 151 152 153 154 155 156 157 158 159 |
# File 'lib/rservicebus/Host.rb', line 150 def loadContracts() log 'Load Contracts' @config.contractList.each do |path| require path RServiceBus.rlog "Loaded Contract: #{path}" end return self end |
#loadHandlers ⇒ Object
Load and configure Message Handlers
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/rservicebus/Host.rb', line 116 def loadHandlers() log 'Load Message Handlers' @handlerManager = HandlerManager.new( self, @resourceManager, @stateManager ) @handlerLoader = HandlerLoader.new( self, @handlerManager ) @config.handlerPathList.each do |path| @handlerLoader.loadHandlersFromPath(path) end return self end |
#loadLibs ⇒ Object
For each directory given, find and load all librarys
163 164 165 166 167 168 169 170 171 |
# File 'lib/rservicebus/Host.rb', line 163 def loadLibs() log 'Load Libs' @config.libList.each do |path| $:.unshift path end return self end |
#loadSagas ⇒ Object
Load and configure Sagas
129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/rservicebus/Host.rb', line 129 def loadSagas() log 'Load Sagas' @sagaManager = Saga_Manager.new( self, @resourceManager, @sagaStorage ) @sagaLoader = SagaLoader.new( self, @sagaManager ) @config.sagaPathList.each do |path| @sagaLoader.loadSagasFromPath(path) end return self end |
#log(string, ver = false) ⇒ Object
Provides a thin logging veneer
42 43 44 |
# File 'lib/rservicebus/Host.rb', line 42 def log(string, ver=false) RServiceBus.log( string, ver ) end |
#Publish(msg) ⇒ Object
Sends an event to all subscribers across the bus
535 536 537 538 539 540 541 542 543 544 |
# File 'lib/rservicebus/Host.rb', line 535 def Publish( msg ) RServiceBus.rlog 'Bus.Publish' @stats.incTotalPublished subscriptions = @subscriptionManager.get(msg.class.name) subscriptions.each do |subscriber| self.queueMsgForSendOnComplete( msg, subscriber ) end end |
#queueMsgForSendOnComplete(msg, queueName, timestamp = nil) ⇒ Object
488 489 490 491 492 |
# File 'lib/rservicebus/Host.rb', line 488 def queueMsgForSendOnComplete( msg, queueName, =nil ) correlationId = @sagaData.nil? ? nil : @sagaData.correlationId correlationId = (!@msg.nil? && !@msg.correlationId.nil?) ? @msg.correlationId : correlationId @queueForMsgsToBeSentOnComplete << Hash['msg', msg, 'queueName', queueName, 'correlationId', correlationId, 'timestamp', ] end |
#Reply(msg) ⇒ Object
Sends a msg back across the bus Reply queues are specified in each msg. It works like email, where the reply address can actually be anywhere
499 500 501 502 503 504 |
# File 'lib/rservicebus/Host.rb', line 499 def Reply( msg ) RServiceBus.rlog 'Reply with: ' + msg.class.name + ' To: ' + @msg.returnAddress @stats.incTotalReply self.queueMsgForSendOnComplete( msg, @msg.returnAddress ) end |
#run ⇒ Object
Ignition
226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/rservicebus/Host.rb', line 226 def run log 'Starting the Host' log "Watching, #{@mq.localQueueName}" $0 = "rservicebus - #{@mq.localQueueName}" unless @config.forwardReceivedMessagesTo.nil? then log 'Forwarding all received messages to: ' + @config.forwardReceivedMessagesTo.to_s end unless @config.forwardSentMessagesTo.nil? then log 'Forwarding all sent messages to: ' + @config.forwardSentMessagesTo.to_s end self.StartListeningToEndpoints end |
#Send(msg, timestamp = nil) ⇒ Object
Send a msg across the bus msg destination is specified at the infrastructure level
522 523 524 525 526 527 528 529 530 |
# File 'lib/rservicebus/Host.rb', line 522 def Send( msg, =nil ) RServiceBus.rlog 'Bus.Send' @stats.incTotalSent msgName = msg.class.name queueName = self.getEndpointForMsg( msgName ) self.queueMsgForSendOnComplete( msg, queueName, ) end |
#sendQueuedMsgs ⇒ Object
478 479 480 481 482 483 484 485 486 |
# File 'lib/rservicebus/Host.rb', line 478 def sendQueuedMsgs @queueForMsgsToBeSentOnComplete.each do |row| if row['timestamp'].nil? then self._SendNeedsWrapping( row['msg'], row['queueName'], row['correlationId'] ) else @sendAtManager.Add( row ) end end end |
#sendSubscriptions ⇒ Object
Subscriptions are specified by adding events to the msg endpoint mapping
106 107 108 109 110 111 112 |
# File 'lib/rservicebus/Host.rb', line 106 def sendSubscriptions log 'Send Subscriptions' @endpointMapping.getSubscriptionEndpoints.each { |eventName| self.Subscribe( eventName ) } return self end |
#StartListeningToEndpoints ⇒ Object
Receive a msg, prep it, and handle any errors that may occur
-
Most of this should be queue independant
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 |
# File 'lib/rservicebus/Host.rb', line 243 def StartListeningToEndpoints log 'Waiting for messages. To exit press CTRL+C' # statOutputCountdown = 0 = true retries = @config.maxRetries while do #Popping a msg off the queue should not be in the message handler, as it affects retry begin @stats.tick if @circuitBreaker.Broken then sleep 0.5 next end body = @mq.pop begin @stats.incTotalProcessed @msg = YAML::load(body) if @msg.msg.class.name == 'RServiceBus::Message_Subscription' then @subscriptionManager.add( @msg.msg.eventName, @msg.returnAddress ) elsif @msg.msg.class.name == 'RServiceBus::Message_StatisticOutputOn' then @stats.output = true log 'Turn on Stats logging' elsif @msg.msg.class.name == 'RServiceBus::Message_StatisticOutputOff' then @stats.output = false log 'Turn off Stats logging' elsif @msg.msg.class.name == 'RServiceBus::Message_VerboseOutputOn' then ENV['VERBOSE'] = 'true' log 'Turn on Verbose logging' elsif @msg.msg.class.name == 'RServiceBus::Message_VerboseOutputOff' then ENV.delete('VERBOSE') log 'Turn off Verbose logging' else self.HandleMessage() unless @config.forwardReceivedMessagesTo.nil? then self._SendAlreadyWrappedAndSerialised(body, @config.forwardReceivedMessagesTo) end end @mq.ack rescue ClassNotFoundForMsg => e puts "*** Class not found for msg, #{e.}" puts "*** Ensure, #{e.}, is defined in Contract.rb, most likely as 'Class #{e.} end" @msg.addErrorMsg( @mq.localQueueName, e. ) serialized_object = YAML::dump(@msg) self._SendAlreadyWrappedAndSerialised(serialized_object, @config.errorQueueName) @mq.ack rescue NoHandlerFound => e puts "*** Handler not found for msg, #{e.}" puts "*** Ensure a handler named, #{e.}, is present in the MessageHandler directory." @msg.addErrorMsg( @mq.localQueueName, e. ) serialized_object = YAML::dump(@msg) self._SendAlreadyWrappedAndSerialised(serialized_object, @config.errorQueueName) @mq.ack rescue PropertyNotSet => e #This has been re-rasied from a rescue in the handler puts "*** #{e.}" #"Property, #{e.message}, not set for, #{handler.class.name}" propertyName = e.[10, e..index(',', 10)-10] puts "*** Ensure the environment variable, RSB_#{propertyName}, has been set at startup." rescue Exception => e sleep 0.5 puts '*** Exception occurred' puts e. puts e.backtrace puts '***' if retries > 0 then retries = retries - 1 @mq.returnToQueue else @circuitBreaker.Failure @stats.incTotalErrored if e.class.name == 'Beanstalk::NotConnected' then puts 'Lost connection to beanstalkd.' puts '*** Start or Restart beanstalkd and try again.' abort(); end if e.class.name == 'Redis::CannotConnectError' then puts 'Lost connection to redis.' puts '*** Start or Restart redis and try again.' abort(); end errorString = e. + '. ' + e.backtrace.join('. ') # log errorString @msg.addErrorMsg( @mq.localQueueName, errorString ) serialized_object = YAML::dump(@msg) self._SendAlreadyWrappedAndSerialised(serialized_object, @config.errorQueueName) @mq.ack retries = @config.maxRetries end end rescue SystemExit, Interrupt puts 'Exiting on request ...' = false rescue NoMsgToProcess => e #This exception is just saying there are no messages to process statOutputCountdown = 0 @queueForMsgsToBeSentOnComplete = Array.new @monitors.each do |o| o.Look end self.sendQueuedMsgs @queueForMsgsToBeSentOnComplete = nil @queueForMsgsToBeSentOnComplete = Array.new @cronManager.Run self.sendQueuedMsgs @queueForMsgsToBeSentOnComplete = nil @sendAtManager.Process @circuitBreaker.Success rescue Exception => e if e. == 'SIGTERM' || e. == 'SIGINT' then puts 'Exiting on request ...' = false else puts '*** This is really unexpected.' = false puts 'Message: ' + e. puts e.backtrace end end end end |
#Subscribe(eventName) ⇒ Object
Sends a subscription request across the Bus
549 550 551 552 553 554 555 556 |
# File 'lib/rservicebus/Host.rb', line 549 def Subscribe( eventName ) RServiceBus.rlog 'Bus.Subscribe: ' + eventName queueName = self.getEndpointForMsg( eventName ) subscription = Message_Subscription.new( eventName ) self._SendNeedsWrapping( subscription, queueName, nil ) end |