Class: RServiceBus2::Host

Inherits:
Object
  • Object
show all
Defined in:
lib/rservicebus2/host.rb

Overview

Host process for rservicebus

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHost

Returns a new instance of Host.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/rservicebus2/host.rb', line 151

def initialize
  RServiceBus2.rlog "Current directory: #{Dir.pwd}"
  @config = ConfigFromEnv.new.load_host_section
                              .load_contracts
                              .load_handler_path_list
                              .load_saga_path_list
                              .load_libs
                              .load_working_dir_list

  connect_to_mq

  @endpoint_mapping = EndpointMapping.new.configure(@mq.local_queue_name)

  self.configure_statistics
      .load_contracts
      .load_libs
      .configure_send_at_manager
      .configure_state_manager
      .configure_saga_storage
      .configure_app_resource
      .configure_circuit_breaker
      .configure_monitors
      .load_handlers
      .load_sagas
      .configure_cron_manager
      .configure_subscriptions
      .send_subscriptions

  self
end

Instance Attribute Details

#saga_dataObject

Returns the value of attribute saga_data.



13
14
15
# File 'lib/rservicebus2/host.rb', line 13

def saga_data
  @saga_data
end

Instance Method Details

#_send_already_wrapped_and_serialised(serialized_object, queue_name) ⇒ Object

Sends a msg across the bus

Parameters:

  • serialized_object (String)

    serialized RServiceBus2::Message

  • queue_name (String)

    endpoint to which the msg will be sent



381
382
383
384
385
386
387
388
389
# File 'lib/rservicebus2/host.rb', line 381

def _send_already_wrapped_and_serialised(serialized_object, queue_name)
  RServiceBus2.rlog 'Bus._send_already_wrapped_and_serialised'

  unless @config.forward_sent_messages_to.nil?
    @mq.send(@config.forward_sent_messages_to, serialized_object)
  end

  @mq.send(queue_name, serialized_object)
end

#_send_needs_wrapping(msg, queue_name, correlation_id) ⇒ Object

Sends a msg across the bus

Parameters:

  • msg (RServiceBus2::Message)

    msg to be sent

  • queueName (String)

    endpoint to which the msg will be sent



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/rservicebus2/host.rb', line 394

def _send_needs_wrapping(msg, queue_name, correlation_id)
  RServiceBus2.rlog 'Bus._send_needs_wrapping'

  r_msg = RServiceBus2::Message.new(msg, @mq.local_queue_name, correlation_id)
  if queue_name.index('@').nil?
    q = queue_name
    RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}"
  else
    parts = queue_name.split('@')
    r_msg.set_remote_queue_name(parts[0])
    r_msg.set_remote_host_name(parts[1])
    q = 'transport-out'
    RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}, via #{q}"
  end

  serialized_object = YAML.dump(r_msg)
  _send_already_wrapped_and_serialised(serialized_object, q)
end

#configure_app_resourceObject

Thin veneer for Configuring external resources



23
24
25
26
27
28
29
30
# File 'lib/rservicebus2/host.rb', line 23

def configure_app_resource
  @resource_manager = ConfigureAppResource.new
                                          .get_resources(ENV,
                                                         self,
                                                         @state_manager,
                                                         @saga_storage)
  self
end

#configure_circuit_breakerObject

Thin veneer for Configuring Cron



55
56
57
58
# File 'lib/rservicebus2/host.rb', line 55

def configure_circuit_breaker
  @circuit_breaker = CircuitBreaker.new(self)
  self
end

#configure_cron_managerObject

Thin veneer for Configuring Cron



111
112
113
114
# File 'lib/rservicebus2/host.rb', line 111

def configure_cron_manager
  @cron_manager = CronManager.new(self, @handler_manager.get_list_of_msg_names)
  self
end

#configure_monitorsObject

Thin veneer for Configuring external resources



61
62
63
64
# File 'lib/rservicebus2/host.rb', line 61

def configure_monitors
  @monitors = ConfigureMonitor.new(self, @resource_manager).get_monitors(ENV)
  self
end

#configure_saga_storageObject

Thin veneer for Configuring state



45
46
47
48
49
50
51
52
# File 'lib/rservicebus2/host.rb', line 45

def configure_saga_storage
  string = RServiceBus2.get_value('SAGA_URI')
  string = 'dir:///tmp' if string.nil?

  uri = URI.parse(string)
  @saga_storage = SagaStorage.get(uri)
  self
end

#configure_send_at_managerObject

Thin veneer for Configuring SendAt



33
34
35
36
# File 'lib/rservicebus2/host.rb', line 33

def configure_send_at_manager
  @send_at_manager = SendAtManager.new(self)
  self
end

#configure_state_managerObject

Thin veneer for Configuring state



39
40
41
42
# File 'lib/rservicebus2/host.rb', line 39

def configure_state_manager
  @state_manager = StateManager.new
  self
end

#configure_statisticsObject

Initialise statistics monitor



146
147
148
149
# File 'lib/rservicebus2/host.rb', line 146

def configure_statistics
  @stats = StatisticManager.new( self )
  self
end

#configure_subscriptionsObject

Load, configure and initialise Subscriptions



139
140
141
142
143
# File 'lib/rservicebus2/host.rb', line 139

def configure_subscriptions
  subscription_storage = ConfigureSubscriptionStorage.new.get(@config.app_name, @config.subscription_uri)
  @subscription_manager = SubscriptionManager.new(subscription_storage)
  self
end

#connect_to_mqObject

Thin veneer for Configuring the Message Queue



67
68
69
70
# File 'lib/rservicebus2/host.rb', line 67

def connect_to_mq
  @mq = MQ.get
  self
end

#get_endpoint_for_msg(msg_name) ⇒ Object



440
441
442
443
444
445
446
447
448
449
# File 'lib/rservicebus2/host.rb', line 440

def get_endpoint_for_msg(msg_name)
  queue_name = @endpoint_mapping.get(msg_name)
  return queue_name unless queue_name.nil?

  return @mq.local_queue_name if @handler_manager.can_msg_be_handled_locally(msg_name)

  log 'No end point mapping found for: ' + msg_name
  log '**** Check environment variable MessageEndpointMappings contains an entry named : ' + msg_name
  raise 'No end point mapping found for: ' + msg_name
end

#handle_messageObject

Send the current msg to the appropriate handlers



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/rservicebus2/host.rb', line 337

def handle_message
  @resource_manager.begin
  msg_name = @msg.msg.class.name
  handler_list = @handler_manager.get_handler_list_for_msg(msg_name)
  RServiceBus2.rlog 'Handler found for: ' + msg_name
  begin
    @queue_for_msgs_to_be_sent_on_complete = []

    log "Started processing msg, #{msg_name}"
    handler_list.each do |handler|
      begin
        log "Handler, #{handler.class.name}, Started"
        handler.handle(@msg.msg)
        log "Handler, #{handler.class.name}, Finished"
      rescue PropertyNotSet => e
        raise PropertyNotSet.new( "Property, #{e.message}, not set for, #{handler.class.name}" )
      rescue StandardError => e
        puts "E #{e.message}"
        log 'An error occurred in Handler: ' + handler.class.name
        raise e
      end
    end

    if @saga_manager.handle(@msg) == false && handler_list.length == 0
      fail NoHandlerFound, msg_name
    end
    @resource_manager.commit(msg_name)

    send_queued_msgs
    log "Finished processing msg, #{msg_name}"

  rescue StandardError => e
    @resource_manager.rollback(msg_name)
    @queue_for_msgs_to_be_sent_on_complete = nil
    raise e
  end
end

#load_contractsObject

Load Contracts



117
118
119
120
121
122
123
124
125
126
# File 'lib/rservicebus2/host.rb', line 117

def load_contracts
  log 'Load Contracts'

  @config.contract_list.each do |path|
    require path
    RServiceBus2.rlog "Loaded Contract: #{path}"
  end

  self
end

#load_handlersObject

Load and configure Message Handlers



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rservicebus2/host.rb', line 85

def load_handlers
  log 'Load Message Handlers'
  @handler_manager = HandlerManager.new(self, @resource_manager, @state_manager)
  @handler_loader = HandlerLoader.new(self, @handler_manager)

  @config.handler_path_list.each do |path|
    @handler_loader.load_handlers_from_path(path)
  end

  self
end

#load_libsObject

For each directory given, find and load all librarys



129
130
131
132
133
134
135
136
# File 'lib/rservicebus2/host.rb', line 129

def load_libs
  log 'Load Libs'
  @config.lib_list.each do |path|
    $:.unshift path
  end

  self
end

#load_sagasObject

Load and configure Sagas



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/rservicebus2/host.rb', line 98

def load_sagas
  log 'Load Sagas'
  @saga_manager = SagaManager.new(self, @resource_manager, @saga_storage)
  @saga_loader = SagaLoader.new(self, @saga_manager)

  @config.saga_path_list.each do |path|
    @saga_loader.load_sagas_from_path(path)
  end

  self
end

#log(string, ver = false) ⇒ Object

Provides a thin logging veneer

Parameters:

  • string (String)

    Log entry

  • ver (Boolean) (defaults to: false)

    Indicator for a verbose log entry



18
19
20
# File 'lib/rservicebus2/host.rb', line 18

def log(string, ver = false)
  RServiceBus2.log(string, ver)
end

#publish(msg) ⇒ Object

Sends an event to all subscribers across the bus

Parameters:



466
467
468
469
470
471
472
473
474
# File 'lib/rservicebus2/host.rb', line 466

def publish(msg)
  RServiceBus2.rlog 'Bus.Publish'
  @stats.inc_total_published

  subscriptions = @subscription_manager.get(msg.class.name)
  subscriptions.each do |subscriber|
    queue_msg_for_send_on_complete(msg, subscriber)
  end
end

#queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil) ⇒ Object



423
424
425
426
427
# File 'lib/rservicebus2/host.rb', line 423

def queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil)
  correlation_id = @saga_data.nil? ? nil : @saga_data.correlation_id
  correlation_id = (!@msg.nil? && !@msg.correlation_id.nil?) ? @msg.correlation_id : correlation_id
  @queue_for_msgs_to_be_sent_on_complete << Hash['msg', msg, 'queue_name', queue_name, 'correlation_id', correlation_id, 'timestamp',timestamp ]
end

#reply(msg) ⇒ Object

Sends a msg back across the bus Reply queues are specified in each msg. It works like email, where the reply address can actually be anywhere

Parameters:



433
434
435
436
437
438
# File 'lib/rservicebus2/host.rb', line 433

def reply(msg)
  RServiceBus2.rlog 'Reply with: ' + msg.class.name + ' To: ' + @msg.return_address
  @stats.inc_total_reply

  queue_msg_for_send_on_complete(msg, @msg.return_address)
end

#runObject

Ignition



183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/rservicebus2/host.rb', line 183

def run
  log 'Starting the Host'
  log "Watching, #{@mq.local_queue_name}"
  $0 = "rservicebus - #{@mq.local_queue_name}"
  unless @config.forward_received_messages_to.nil?
    log 'Forwarding all received messages to: ' + @config.forward_received_messages_to.to_s
  end
  unless @config.forward_sent_messages_to.nil?
    log 'Forwarding all sent messages to: ' + @config.forward_sent_messages_to.to_s
  end

  start_listening_to_endpoints
end

#send(msg, timestamp = nil) ⇒ Object

Send a msg across the bus msg destination is specified at the infrastructure level

Parameters:



455
456
457
458
459
460
461
462
# File 'lib/rservicebus2/host.rb', line 455

def send( msg, timestamp=nil )
  RServiceBus2.rlog 'Bus.Send'
  @stats.inc_total_sent

  msg_name = msg.class.name
  queue_name = self.get_endpoint_for_msg(msg_name)
  queue_msg_for_send_on_complete(msg, queue_name, timestamp)
end

#send_queued_msgsObject



413
414
415
416
417
418
419
420
421
# File 'lib/rservicebus2/host.rb', line 413

def send_queued_msgs
  @queue_for_msgs_to_be_sent_on_complete.each do |row|
    if row['timestamp'].nil?
      _send_needs_wrapping(row['msg'], row['queue_name'], row['correlation_id'])
    else
      @send_at_manager.add(row)
    end
  end
end

#send_subscriptionsObject

Subscriptions are specified by adding events to the msg endpoint mapping



74
75
76
77
78
79
80
81
82
# File 'lib/rservicebus2/host.rb', line 74

def send_subscriptions
  log 'Send Subscriptions'

  @endpoint_mapping.get_subscription_endpoints.each do |event_name|
    subscribe(event_name)
  end

  self
end

#start_listening_to_endpointsObject

Receive a msg, prep it, and handle any errors that may occur

  • Most of this should be queue independant



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/rservicebus2/host.rb', line 199

def start_listening_to_endpoints
  log 'Waiting for messages. To exit press CTRL+C'
  message_loop = true
  retries = @config.max_retries

  while message_loop
    # Popping a msg off the queue should not be in the message handler,
    #  as it affects retry
    begin
      @stats.tick
      if @circuit_breaker.broken
        sleep 0.5
        next
      end

      body = @mq.pop
      begin
        @stats.inc_total_processed
        @msg = YAML.load(body)
        case @msg.msg.class.name
        when 'RServiceBus2::MessageSubscription'
          @subscription_manager.add(@msg.msg.event_name,
                                    @msg.return_address)
        when 'RServiceBus2::MessageStatisticOutputOn'
          @stats.output = true
          log 'Turn on Stats logging'
        when 'RServiceBus2::MessageStatisticOutputOff'
          @stats.output = false
          log 'Turn off Stats logging'
        when 'RServiceBus2::MessageVerboseOutputOn'
          ENV['VERBOSE'] = 'true'
          log 'Turn on Verbose logging'
        when 'RServiceBus2::MessageVerboseOutputOff'
          ENV.delete('VERBOSE')
          log 'Turn off Verbose logging'
        else
          handle_message
          unless @config.forward_received_messages_to.nil?
            _send_already_wrapped_and_serialised(body, @config.forward_received_messages_to)
          end
        end
        @mq.ack
      rescue ClassNotFoundForMsg => e
        puts "*** Class not found for msg, #{e.message}"
        puts "*** Ensure, #{e.message}, is defined in contract.rb, most
          likely as 'Class #{e.message} end"

        @msg.add_error_msg(@mq.local_queue_name, e.message)
        serialized_object = YAML.dump(@msg)
        _send_already_wrapped_and_serialised(serialized_object,
                                             @config.error_queue_name)
        @mq.ack
      rescue NoHandlerFound => e
        puts "*** Handler not found for msg, #{e.message}"
        puts "*** Ensure a handler named, #{e.message}, is present in the
          messagehandler directory."

        @msg.add_error_msg(@mq.local_queue_name, e.message)
        serialized_object = YAML.dump(@msg)
        _send_already_wrapped_and_serialised(serialized_object,
                                             @config.error_queue_name)
        @mq.ack

      rescue PropertyNotSet => e
        # This has been re-rasied from a rescue in the handler
        puts "*** #{e.message}"
        # "Property, #{e.message}, not set for, #{handler.class.name}"
        property_name = e.message[10, e.message.index(',', 10) - 10]
        puts "*** Ensure the environment variable, RSB_#{property_name},
          has been set at startup."

      rescue StandardError => e
        sleep 0.5

        puts '*** Exception occurred'
        puts e.message
        puts e.backtrace
        puts '***'

        if retries > 0
          retries -= 1
          @mq.return_to_queue
        else
          @circuit_breaker.failure
          @stats.inc_total_errored
          if e.class.name == 'Beanstalk::NotConnected'
            puts 'Lost connection to beanstalkd.'
            puts '*** Start or Restart beanstalkd and try again.'
            abort
          end

          if e.class.name == 'Redis::CannotConnectError'
            puts 'Lost connection to redis.'
            puts '*** Start or Restart redis and try again.'
            abort
          end

          error_string = e.message + '. ' + e.backtrace.join('. ')
          @msg.add_error_msg(@mq.local_queue_name, error_string)
          serialized_object = YAML.dump(@msg)
          _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name)
          @mq.ack
          retries = @config.max_retries
        end
      end
    rescue SystemExit, Interrupt
      puts 'Exiting on request ...'
      message_loop = false
    rescue NoMsgToProcess => e
      # This exception is just saying there are no messages to process
      @queue_for_msgs_to_be_sent_on_complete = []
      @monitors.each(&:look)
      send_queued_msgs
      @queue_for_msgs_to_be_sent_on_complete = nil

      @queue_for_msgs_to_be_sent_on_complete = []
      @cron_manager.run
      send_queued_msgs
      @queue_for_msgs_to_be_sent_on_complete = nil

      @send_at_manager.process
      @circuit_breaker.success

    rescue StandardError => e
      if e.message == 'SIGTERM' || e.message == 'SIGINT'
        puts 'Exiting on request ...'
        message_loop = false
      else
        puts '*** This is really unexpected.'
        message_loop = false
        puts 'Message: ' + e.message
        puts e.backtrace
      end
    end
  end
end

#subscribe(event_name) ⇒ Object

Sends a subscription request across the Bus

Parameters:

  • eventName (String)

    event to be subscribes to



478
479
480
481
482
483
484
485
# File 'lib/rservicebus2/host.rb', line 478

def subscribe(event_name)
  RServiceBus2.rlog 'Bus.Subscribe: ' + event_name

  queue_name = get_endpoint_for_msg(event_name)
  subscription = MessageSubscription.new(event_name)

  _send_needs_wrapping(subscription, queue_name, nil)
end