Class: RSMP::SupervisorProxy

Inherits:
Proxy
  • Object
show all
Includes:
Modules::AggregatedStatus, Modules::Alarms, Modules::Commands, Modules::Status
Defined in:
lib/rsmp/proxy/supervisor/modules/alarms.rb,
lib/rsmp/proxy/supervisor/modules/status.rb,
lib/rsmp/proxy/supervisor/modules/commands.rb,
lib/rsmp/proxy/supervisor/supervisor_proxy.rb,
lib/rsmp/proxy/supervisor/modules/aggregated_status.rb

Defined Under Namespace

Modules: Modules

Constant Summary

Constants inherited from Proxy

Proxy::WRAPPING_DELIMITER

Instance Attribute Summary collapse

Attributes inherited from Proxy

#archive, #collector, #connection_info, #core_version, #ip, #node, #port, #state, #sxl

Attributes included from Task

#task

Attributes included from Distributor

#receivers

Attributes included from Logging

#archive, #logger

Instance Method Summary collapse

Methods included from Modules::AggregatedStatus

#process_aggregated_status_request, #send_aggregated_status, #send_all_aggregated_status

Methods included from Modules::Alarms

#handle_alarm_acknowledge, #handle_alarm_request, #handle_alarm_resume, #handle_alarm_suspend, #process_alarm, #send_active_alarms, #send_alarm

Methods included from Modules::Commands

#build_command_rvs, #execute_commands, #process_command_request, #simplify_command_requests

Methods included from Modules::Status

#add_status_subscription, #build_status_list, #build_undefined_statuses, #check_on_change_update, #check_status_subscription, #fetch_last_sent_status, #fetch_status_values, #get_status_subscribe_interval, #interval_update_due?, #process_status_request, #process_status_subcribe, #process_status_unsubcribe, #remove_status_subscription, #rsmpify_value, #send_status_updates, #status_update_timer, #store_last_sent_status

Methods inherited from Proxy

#author, #clear, #clock, #close, #close_socket, #close_stream, #connected?, #disconnect, #disconnected?, #inspect, #log, #now, #ready?, #receive_error, #revive, #schemas, #setup, #state_changed, #stop_reader, #stop_subtasks, #stop_timer, version_meets_requirement?, #wait_for_reader

Methods included from Proxy::Modules::Tasks

#read_line, #run_reader, #run_timer, #start_reader, #start_timer

Methods included from Proxy::Modules::Versions

#check_core_version, #core_versions, #extraneous_version, #send_version, #version_acknowledged, version_meets_requirement?

Methods included from Proxy::Modules::Receive

#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle

Methods included from Proxy::Modules::Send

#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_and_optionally_collect, #send_message

Methods included from Proxy::Modules::Acknowledgements

#acknowledge, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged

Methods included from Proxy::Modules::Watchdogs

#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled

Methods included from Proxy::Modules::State

#wait_for_state

Methods included from Task

#initialize_task, #restart, #start, #stop, #stop_subtasks, #task_status, #wait, #wait_for_condition

Methods included from Inspect

#inspect, #inspector

Methods included from Distributor

#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #inspect, #remove_receiver, #with_deferred_distribution

Methods included from Logging

#author, #initialize_logging, #log

Constructor Details

#initialize(options) ⇒ SupervisorProxy

Returns a new instance of SupervisorProxy.



14
15
16
17
18
19
20
21
22
23
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 14

def initialize(options)
  super(options.merge(node: options[:site]))
  @site = options[:site]
  @site_settings = @site.site_settings.clone
  @ip = options[:ip]
  @port = options[:port]
  @status_subscriptions = {}
  @sxl = @site_settings['sxl']
  @synthetic_id = Supervisor.build_id_from_ip_port @ip, @port
end

Instance Attribute Details

#siteObject (readonly)

Returns the value of attribute site.



12
13
14
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12

def site
  @site
end

#supervisor_idObject (readonly)

Returns the value of attribute supervisor_id.



12
13
14
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12

def supervisor_id
  @supervisor_id
end

Instance Method Details

#acknowledged_first_ingoing(message) ⇒ Object



129
130
131
132
133
134
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 129

def acknowledged_first_ingoing(message)
  case message.type
  when 'Watchdog'
    handshake_complete
  end
end

#check_sxl_version(message) ⇒ Object



172
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 172

def check_sxl_version(message); end

#connectObject

connect to the supervisor and initiate handshake supervisor



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 54

def connect
  log "Connecting to supervisor at #{@ip}:#{@port}", level: :info
  self.state = :connecting
  connect_tcp
  @logger.unmute @ip, @port
  log "Connected to supervisor at #{@ip}:#{@port}", level: :info
rescue SystemCallError => e
  raise ConnectionError, "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}"
rescue StandardError => e
  raise ConnectionError, "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}"
end

#connect_tcpObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 71

def connect_tcp
  @endpoint = IO::Endpoint.tcp(@ip, @port)

  # this timeout is a workaround for connect hanging on windows if the other side is not present yet
  timeout = @site_settings.dig('timeouts', 'connect') || 1.1
  task.with_timeout timeout do
    @socket = @endpoint.connect
  end
  delay = @site_settings.dig('intervals', 'after_connect')
  task.sleep delay if delay

  @stream = IO::Stream::Buffered.new(@socket)
  @protocol = RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed
  self.state = :connected
rescue Errno::ECONNREFUSED => e # rescue to avoid log output
  log 'Connection refused', level: :warning
  raise e
end

#handshake_completeObject



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 90

def handshake_complete
  sanitized_sxl_version = RSMP::Schema.sanitize_version(sxl_version)
  log "Connection to supervisor established, using core #{@core_version}, #{sxl} #{sanitized_sxl_version}",
      level: :info
  self.state = :ready
  start_watchdog
  if @site_settings['send_after_connect']
    send_all_aggregated_status
    send_active_alarms
  end
  super
end

#mainObject



174
175
176
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 174

def main
  @site.main
end

#process_message(message) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 103

def process_message(message)
  case message
  when StatusResponse, StatusUpdate, AggregatedStatus, AlarmIssue
    will_not_handle message
  when AggregatedStatusRequest
    process_aggregated_status_request message
  when CommandRequest
    process_command_request message
  when CommandResponse
    process_command_response message
  when StatusRequest
    process_status_request message
  when StatusSubscribe
    process_status_subcribe message
  when StatusUnsubscribe
    process_status_unsubcribe message
  when Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest
    process_alarm message
  else
    super
  end
rescue UnknownComponent, UnknownCommand, UnknownStatus,
       MessageRejected, MissingAttribute => e
  dont_acknowledge message, '', e.to_s
end

#process_version(message) ⇒ Object



163
164
165
166
167
168
169
170
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 163

def process_version(message)
  return extraneous_version message if @version_determined

  check_core_version message
  check_sxl_version message
  @site_id = Supervisor.build_id_from_ip_port @ip, @port
  version_accepted message
end

#reconnect_delay?Boolean

Returns:

  • (Boolean)


136
137
138
139
140
141
142
143
144
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 136

def reconnect_delay?
  return false if @site_settings['intervals']['reconnect'] == :no

  interval = @site_settings['intervals']['reconnect']
  log "Will try to reconnect again every #{interval} seconds...", level: :info
  @logger.mute @ip, @port
  @task.sleep interval
  true
end

#runObject

handle communication if disconnected, then try to reconnect



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 27

def run
  loop do
    connect
    start_reader
    start_handshake
    wait_for_reader # run until disconnected
    break unless reconnect_delay?
  rescue Restart
    @logger.mute @ip, @port
    raise
  rescue RSMP::ConnectionError => e
    log e, level: :error
    break unless reconnect_delay?
  rescue StandardError => e
    distribute_error e, level: :internal
    break unless reconnect_delay?
  ensure
    close
    stop_subtasks
  end
end

#start_handshakeObject



49
50
51
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 49

def start_handshake
  send_version @site_settings['site_id'], core_versions
end

#stop_taskObject



66
67
68
69
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 66

def stop_task
  super
  @last_status_sent = nil
end

#sxl_versionObject



159
160
161
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 159

def sxl_version
  @site_settings['sxl_version'].to_s
end

#timer(now) ⇒ Object



154
155
156
157
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 154

def timer(now)
  super
  status_update_timer now if ready?
end

#version_accepted(message) ⇒ Object



146
147
148
149
150
151
152
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 146

def version_accepted(message)
  log "Received Version message, using RSMP #{@core_version}", message: message, level: :log
  start_timer
  acknowledge message
  @version_determined = true
  send_watchdog
end