Class: RSMP::Supervisor

Inherits:
Node
  • Object
show all
Includes:
Modules::Configuration, Modules::Connection, Modules::Sites
Defined in:
lib/rsmp/node/supervisor/supervisor.rb,
lib/rsmp/node/supervisor/modules/sites.rb,
lib/rsmp/node/supervisor/modules/connection.rb,
lib/rsmp/node/supervisor/modules/configuration.rb

Defined Under Namespace

Modules: Modules

Instance Attribute Summary collapse

Attributes inherited from Node

#archive, #clock, #collector, #deferred, #error_queue, #task

Attributes included from Task

#task

Attributes included from Logging

#archive

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Modules::Sites

#aggregated_status_changed, #check_site_already_connected, #check_site_id, #find_site, #find_site_from_ip_port, #site_connected?, #site_ids_changed, #wait_for_site, #wait_for_site_disconnect

Methods included from Modules::Connection

#accept?, #accept_connection, #authorize_ip, #build_proxy_settings, #check_max_sites, #close, #format_ip_and_port, #handle_connection, #peek_version_message, #reject_connection, #retrieve_site_id, #setup_proxy, #validate_and_start_proxy

Methods included from Modules::Configuration

#check_site_sxl_types, #handle_supervisor_settings, #ip_to_site_settings, #site_id_to_site_setting

Methods inherited from Node

#author, #check_required_settings, #clear_deferred, #defer, #distribute_error, #do_deferred, #ignore_errors, #now, #process_deferred, #stop_subtasks

Methods included from Task

#initialize_task, #restart, #start, #stop_subtasks, #stop_task, #task_status, #wait, #wait_for_condition

Methods included from Inspect

#inspect, #inspector

Methods included from Logging

#author, #initialize_logging, #log

Constructor Details

#initialize(options = {}) ⇒ Supervisor

Returns a new instance of Supervisor.



15
16
17
18
19
20
21
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 15

def initialize(options = {})
  handle_supervisor_settings(options[:supervisor_settings] || {})
  super
  @proxies = []
  @ready_condition = Async::Notification.new
  @site_id_condition = Async::Notification.new
end

Instance Attribute Details

#core_versionObject (readonly)

Returns the value of attribute core_version.



11
12
13
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 11

def core_version
  @core_version
end

#loggerObject (readonly)

Returns the value of attribute logger.



11
12
13
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 11

def logger
  @logger
end

#proxiesObject (readonly)

Returns the value of attribute proxies.



11
12
13
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 11

def proxies
  @proxies
end

#ready_conditionObject (readonly)

Returns the value of attribute ready_condition.



11
12
13
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 11

def ready_condition
  @ready_condition
end

#site_id_conditionObject

Returns the value of attribute site_id_condition.



13
14
15
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 13

def site_id_condition
  @site_id_condition
end

#supervisor_settingsObject (readonly)

Returns the value of attribute supervisor_settings.



11
12
13
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 11

def supervisor_settings
  @supervisor_settings
end

Class Method Details

.build_id_from_ip_port(ip, port) ⇒ Object



68
69
70
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 68

def self.build_id_from_ip_port(ip, port)
  Digest::MD5.hexdigest("#{ip}:#{port}")[0..8]
end

Instance Method Details

#build_proxy(settings) ⇒ Object



64
65
66
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 64

def build_proxy(settings)
  SiteProxy.new settings
end

#runObject

listen for connections



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 28

def run
  log "Starting supervisor on port #{@supervisor_settings['port']}",
      level: :info,
      timestamp: @clock.now

  @endpoint = IO::Endpoint.tcp('0.0.0.0', @supervisor_settings['port'])
  @accept_task = Async::Task.current.async do |task|
    task.annotate 'supervisor accept loop'
    @endpoint.accept do |socket| # creates fibers
      handle_connection(socket)
    rescue StandardError => e
      distribute_error e, level: :internal
    end
  rescue Async::Stop
    # Expected during shutdown - no action needed
  rescue StandardError => e
    distribute_error e, level: :internal
  end

  @ready_condition.signal
  @accept_task.wait
rescue StandardError => e
  distribute_error e, level: :internal
end

#site_idObject



23
24
25
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 23

def site_id
  @supervisor_settings['site_id']
end

#stopObject

stop



54
55
56
57
58
59
60
61
62
# File 'lib/rsmp/node/supervisor/supervisor.rb', line 54

def stop
  log "Stopping supervisor #{@supervisor_settings['site_id']}", level: :info

  @accept_task&.stop
  @accept_task = nil

  @endpoint = nil
  super
end