Class: RServiceBus2::Host

Inherits:
Object
  • Object
show all
Defined in:
lib/rservicebus2/host.rb

Overview

Host process for rservicebus

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHost

Returns a new instance of Host.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/rservicebus2/host.rb', line 100

def initialize
  RServiceBus2.rlog "Current directory: #{Dir.pwd}"
  YamlSafeLoader.instance
  @config = ConfigFromEnv.new
                         .load_host_section
                         .load_contracts
                         .load_handler_path_list
                         .load_saga_path_list
                         .load_libs
                         .load_working_dir_list

  @mq = MQ.get

  @endpoint_mapping = EndpointMapping.new.configure(@mq.local_queue_name)

  @stats = StatisticManager.new(self)
  load_contracts
  load_libs
  @send_at_manager = SendAtManager.new(self)
  @state_manager = StateManager.new
  configure_saga_storage
  configure_app_resource
  @circuit_breaker = CircuitBreaker.new(self)
  @monitors = ConfigureMonitor.new(self, @resource_manager).monitors(ENV)
  load_handlers
  load_sagas
  @cron_manager = CronManager.new(self, @handler_manager.msg_names)
  configure_subscriptions
  send_subscriptions
end

Instance Attribute Details

#saga_dataObject

Returns the value of attribute saga_data.



18
19
20
# File 'lib/rservicebus2/host.rb', line 18

def saga_data
  @saga_data
end

Instance Method Details

#_send_already_wrapped_and_serialised(serialized_object, queue_name) ⇒ Object

Sends a msg across the bus

Parameters:

  • serialized_object (String)

    serialized RServiceBus2::Message

  • queue_name (String)

    endpoint to which the msg will be sent



335
336
337
338
339
340
341
# File 'lib/rservicebus2/host.rb', line 335

def _send_already_wrapped_and_serialised(serialized_object, queue_name)
  RServiceBus2.rlog 'Bus._send_already_wrapped_and_serialised'

  @mq.send(@config.forward_sent_messages_to, serialized_object) unless @config.forward_sent_messages_to.nil?

  @mq.send(queue_name, serialized_object)
end

#_send_needs_wrapping(msg, queue_name, correlation_id) ⇒ Object

Sends a msg across the bus

Parameters:

  • msg (RServiceBus2::Message)

    msg to be sent

  • queueName (String)

    endpoint to which the msg will be sent



346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/rservicebus2/host.rb', line 346

def _send_needs_wrapping(msg, queue_name, correlation_id)
  RServiceBus2.rlog 'Bus._send_needs_wrapping'

  r_msg = RServiceBus2::Message.new(msg, @mq.local_queue_name, correlation_id)
  if queue_name.index('@').nil?
    q = queue_name
    RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}"
  else
    parts = queue_name.split('@')
    r_msg.remote_queue_name = parts[0]
    r_msg.remote_host_name = parts[1]
    q = 'transport-out'
    RServiceBus2.rlog "Sending, #{msg.class.name} to, #{queue_name}, via #{q}"
  end

  serialized_object = YAML.dump(r_msg)
  _send_already_wrapped_and_serialised(serialized_object, q)
end

#configure_app_resourceObject

Thin veneer for Configuring external resources



27
28
29
30
31
32
33
# File 'lib/rservicebus2/host.rb', line 27

def configure_app_resource
  @resource_manager = ConfigureAppResource.new
                                          .get_resources(ENV,
                                                         self,
                                                         @state_manager,
                                                         @saga_storage)
end

#configure_saga_storageObject

Thin veneer for Configuring state



36
37
38
39
40
41
42
# File 'lib/rservicebus2/host.rb', line 36

def configure_saga_storage
  string = RServiceBus2.get_value('SAGA_URI', 'dir:///tmp/saga')
  # string = 'dir:///tmp' if string.nil?

  uri = URI.parse(string)
  @saga_storage = SagaStorage.get(uri)
end

#configure_subscriptionsObject

Load, configure and initialise Subscriptions



95
96
97
98
# File 'lib/rservicebus2/host.rb', line 95

def configure_subscriptions
  subscription_storage = ConfigureSubscriptionStorage.new.get(@config.app_name, @config.subscription_uri)
  @subscription_manager = SubscriptionManager.new(subscription_storage)
end

#get_endpoint_for_msg(msg_name) ⇒ Object



393
394
395
396
397
398
399
400
401
402
# File 'lib/rservicebus2/host.rb', line 393

def get_endpoint_for_msg(msg_name)
  queue_name = @endpoint_mapping.get(msg_name)
  return queue_name unless queue_name.nil?

  return @mq.local_queue_name if @handler_manager.can_msg_be_handled_locally(msg_name)

  log "No end point mapping found for: #{msg_name}"
  log "**** Check environment variable MessageEndpointMappings contains an entry named: #{msg_name}"
  raise "No end point mapping found for: #{msg_name}"
end

#handle_messageObject

Send the current msg to the appropriate handlers



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/rservicebus2/host.rb', line 295

def handle_message
  @resource_manager.begin
  msg_name = @msg.msg.class.name
  handler_list = @handler_manager.get_handler_list_for_msg(msg_name)
  RServiceBus2.rlog "Handler found for: #{msg_name}"
  begin
    @queue_for_msgs_to_be_sent_on_complete = []

    log "Started processing msg, #{msg_name}"
    handler_list.each do |handler|
      log "Handler, #{handler.class.name}, Started"
      handler.handle(@msg.msg)
      log "Handler, #{handler.class.name}, Finished"
    rescue PropertyNotSet => e
      raise PropertyNotSet, "Property, #{e.message}, not set for, #{handler.class.name}"
    rescue StandardError => e
      puts "E #{e.message}"
      log "An error occurred in Handler: #{handler.class.name}"
      raise e
    end

    raise NoHandlerFound, msg_name if @saga_manager.handle(@msg) == false && handler_list.length.zero?

    @resource_manager.commit(msg_name)

    send_queued_msgs
    log "Finished processing msg, #{msg_name}"
  rescue StandardError => e
    @resource_manager.rollback(msg_name)
    @queue_for_msgs_to_be_sent_on_complete = nil
    raise e
  end
end

#load_contractsObject

Load Contracts



77
78
79
80
81
82
83
84
# File 'lib/rservicebus2/host.rb', line 77

def load_contracts
  log 'Load Contracts'

  @config.contract_list.each do |path|
    require path
    RServiceBus2.rlog "Loaded Contract: #{path}"
  end
end

#load_handlersObject

Load and configure Message Handlers



55
56
57
58
59
60
61
62
63
# File 'lib/rservicebus2/host.rb', line 55

def load_handlers
  log 'Load Message Handlers'
  @handler_manager = HandlerManager.new(self, @resource_manager, @state_manager)
  @handler_loader = HandlerLoader.new(self, @handler_manager)

  @config.handler_path_list.each do |path|
    @handler_loader.load_handlers_from_path(path)
  end
end

#load_libsObject

For each directory given, find and load all librarys



87
88
89
90
91
92
# File 'lib/rservicebus2/host.rb', line 87

def load_libs
  log 'Load Libs'
  @config.lib_list.each do |path|
    $LOAD_PATH.unshift path
  end
end

#load_sagasObject

Load and configure Sagas



66
67
68
69
70
71
72
73
74
# File 'lib/rservicebus2/host.rb', line 66

def load_sagas
  log 'Load Sagas'
  @saga_manager = SagaManager.new(self, @resource_manager, @saga_storage)
  @saga_loader = SagaLoader.new(self, @saga_manager)

  @config.saga_path_list.each do |path|
    @saga_loader.load_sagas_from_path(path)
  end
end

#log(string) ⇒ Object

Provides a thin logging veneer

Parameters:

  • string (String)

    Log entry



22
23
24
# File 'lib/rservicebus2/host.rb', line 22

def log(string)
  RServiceBus2.log(string)
end

#publish(msg) ⇒ Object

Sends an event to all subscribers across the bus

Parameters:



418
419
420
421
422
423
424
425
426
# File 'lib/rservicebus2/host.rb', line 418

def publish(msg)
  RServiceBus2.rlog 'Bus.Publish'
  @stats.inc_total_published

  subscriptions = @subscription_manager.get(msg.class.name)
  subscriptions.each do |subscriber|
    queue_msg_for_send_on_complete(msg, subscriber)
  end
end

#queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil) ⇒ Object



375
376
377
378
379
380
# File 'lib/rservicebus2/host.rb', line 375

def queue_msg_for_send_on_complete(msg, queue_name, timestamp = nil)
  correlation_id = @saga_data.nil? ? nil : @saga_data.correlation_id
  correlation_id = !@msg.nil? && !@msg.correlation_id.nil? ? @msg.correlation_id : correlation_id
  @queue_for_msgs_to_be_sent_on_complete <<
    Hash['msg', msg, 'queue_name', queue_name, 'correlation_id', correlation_id, 'timestamp', timestamp]
end

#reply(msg, timestamp = nil) ⇒ Object

Sends a msg back across the bus Reply queues are specified in each msg. It works like email, where the reply address can actually be anywhere

Parameters:



386
387
388
389
390
391
# File 'lib/rservicebus2/host.rb', line 386

def reply(msg, timestamp = nil)
  RServiceBus2.rlog "Reply with: #{msg.class.name} To: #{@msg.return_address}"
  @stats.inc_total_reply

  queue_msg_for_send_on_complete(msg, @msg.return_address, timestamp)
end

#runObject

Ignition



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/rservicebus2/host.rb', line 132

def run
  log 'Starting the Host'
  log "Watching, #{@mq.local_queue_name}"
  $0 = "rservicebus - #{@mq.local_queue_name}"
  unless @config.forward_received_messages_to.nil?
    log "Forwarding all received messages to: #{@config.forward_received_messages_to}"
  end
  unless @config.forward_sent_messages_to.nil?
    log "Forwarding all sent messages to: #{@config.forward_sent_messages_to}"
  end

  start_listening_to_endpoints
end

#send(msg, timestamp = nil) ⇒ Object

Send a msg across the bus msg destination is specified at the infrastructure level

Parameters:



407
408
409
410
411
412
413
414
# File 'lib/rservicebus2/host.rb', line 407

def send(msg, timestamp = nil)
  RServiceBus2.rlog 'Bus.Send'
  @stats.inc_total_sent

  msg_name = msg.class.name
  queue_name = get_endpoint_for_msg(msg_name)
  queue_msg_for_send_on_complete(msg, queue_name, timestamp)
end

#send_queued_msgsObject



365
366
367
368
369
370
371
372
373
# File 'lib/rservicebus2/host.rb', line 365

def send_queued_msgs
  @queue_for_msgs_to_be_sent_on_complete.each do |row|
    if row['timestamp'].nil?
      _send_needs_wrapping(row['msg'], row['queue_name'], row['correlation_id'])
    else
      @send_at_manager.add(row)
    end
  end
end

#send_subscriptionsObject

Subscriptions are specified by adding events to the msg endpoint mapping



46
47
48
49
50
51
52
# File 'lib/rservicebus2/host.rb', line 46

def send_subscriptions
  log 'Send Subscriptions'

  @endpoint_mapping.subscription_endpoints.each do |event_name|
    subscribe(event_name)
  end
end

#start_listening_to_endpointsObject

Receive a msg, prep it, and handle any errors that may occur

  • Most of this should be queue independant



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/rservicebus2/host.rb', line 148

def start_listening_to_endpoints
  log 'Waiting for messages. To exit press CTRL+C'
  message_loop = true
  retries = @config.max_retries

  while message_loop
    # Popping a msg off the queue should not be in the message handler,
    #  as it affects retry
    begin
      @stats.tick
      if @circuit_breaker.broken
        sleep 0.5
        next
      end

      body = @mq.pop
      begin
        @stats.inc_total_processed
        @msg = RServiceBus2.safe_load(body)
        case @msg.msg.class.name
        when 'RServiceBus2::MessageSubscription'
          @subscription_manager.add(@msg.msg.event_name,
                                    @msg.return_address)
        when 'RServiceBus2::MessageStatisticOutputOn'
          @stats.output = true
          log 'Turn on Stats logging'
        when 'RServiceBus2::MessageStatisticOutputOff'
          @stats.output = false
          log 'Turn off Stats logging'
        when 'RServiceBus2::MessageVerboseOutputOn'
          ENV['VERBOSE'] = 'true'
          log 'Turn on Verbose logging'
        when 'RServiceBus2::MessageVerboseOutputOff'
          ENV.delete('VERBOSE')
          log 'Turn off Verbose logging'
        else
          handle_message
          unless @config.forward_received_messages_to.nil?
            _send_already_wrapped_and_serialised(body, @config.forward_received_messages_to)
          end
        end
        @mq.ack
      rescue ClassNotFoundForMsg => e
        puts "*** Class not found for msg, #{e.message}"
        puts "*** Ensure, #{e.message}, is defined in contract.rb, most
          likely as 'Class #{e.message} end"

        @msg.add_error_msg(@mq.local_queue_name, e.message)
        serialized_object = YAML.dump(@msg)
        _send_already_wrapped_and_serialised(serialized_object,
                                             @config.error_queue_name)
        @mq.ack
      rescue NoHandlerFound => e
        puts "*** Handler not found for msg, #{e.message}"
        puts "*** Ensure a handler named, #{e.message}, is present in the
          messagehandler directory."

        @msg.add_error_msg(@mq.local_queue_name, e.message)
        serialized_object = YAML.dump(@msg)
        _send_already_wrapped_and_serialised(serialized_object,
                                             @config.error_queue_name)
        @mq.ack
      rescue PropertyNotSet => e
        # This has been re-rasied from a rescue in the handler
        puts "*** #{e.message}"
        # "Property, #{e.message}, not set for, #{handler.class.name}"
        property_name = e.message[10, e.message.index(',', 10) - 10]
        puts "*** Ensure the environment variable, RSB_#{property_name},
          has been set at startup."
      rescue StandardError => e
        sleep 0.5

        puts '*** Exception occurred'
        puts e.message
        puts e.backtrace
        puts '***'

        if retries.positive?
          retries -= 1
          @mq.return_to_queue
        else
          @circuit_breaker.failure
          @stats.inc_total_errored
          if e.class.name == 'Beanstalk::NotConnected'
            puts 'Lost connection to beanstalkd.'
            puts '*** Start or Restart beanstalkd and try again.'
            abort
          end

          if e.class.name == 'Redis::CannotConnectError'
            puts 'Lost connection to redis.'
            puts '*** Start or Restart redis and try again.'
            abort
          end

          error_string = "#{e.message}. #{e.backtrace.join('. ')}"
          @msg.add_error_msg(@mq.local_queue_name, error_string)
          serialized_object = YAML.dump(@msg)
          _send_already_wrapped_and_serialised(serialized_object, @config.error_queue_name)
          @mq.ack
          retries = @config.max_retries
        end
      end
    rescue SystemExit, Interrupt
      puts 'Exiting on request ...'
      message_loop = false
    rescue NoMsgToProcess => e
      # This exception is just saying there are no messages to process
      begin
        @queue_for_msgs_to_be_sent_on_complete = []
        @monitors.each(&:look)
        send_queued_msgs
        @queue_for_msgs_to_be_sent_on_complete = nil

        @queue_for_msgs_to_be_sent_on_complete = []
        @cron_manager.run
        send_queued_msgs
        @queue_for_msgs_to_be_sent_on_complete = nil

        @send_at_manager.process
        @circuit_breaker.success
      rescue StandardError => e
        if e.message == 'SIGTERM' || e.message == 'SIGINT'
          puts 'Exiting on request ...'
          message_loop = false
        else
          puts '*** This is really unexpected.'
          message_loop = false
          puts "Message: #{e.message}"
          puts e.backtrace
        end
      end
    rescue StandardError => e
      if e.message == 'SIGTERM' || e.message == 'SIGINT'
        puts 'Exiting on request ...'
        message_loop = false
      else
        puts '*** This is really unexpected.'
        message_loop = false
        puts "Message: #{e.message}"
        puts e.backtrace
      end
    end
  end
end

#subscribe(event_name) ⇒ Object

Sends a subscription request across the Bus

Parameters:

  • eventName (String)

    event to be subscribes to



430
431
432
433
434
435
436
437
# File 'lib/rservicebus2/host.rb', line 430

def subscribe(event_name)
  RServiceBus2.rlog "Bus.Subscribe: #{event_name}"

  queue_name = get_endpoint_for_msg(event_name)
  subscription = MessageSubscription.new(event_name)

  _send_needs_wrapping(subscription, queue_name, nil)
end