Class: RSMP::SupervisorProxy
- Defined in:
- lib/rsmp/proxy/supervisor/modules/alarms.rb,
lib/rsmp/proxy/supervisor/modules/status.rb,
lib/rsmp/proxy/supervisor/modules/commands.rb,
lib/rsmp/proxy/supervisor/supervisor_proxy.rb,
lib/rsmp/proxy/supervisor/modules/aggregated_status.rb
Defined Under Namespace
Modules: Modules
Constant Summary
Constants inherited from Proxy
Instance Attribute Summary collapse
-
#site ⇒ Object
readonly
Returns the value of attribute site.
-
#supervisor_id ⇒ Object
readonly
Returns the value of attribute supervisor_id.
Attributes inherited from Proxy
#archive, #collector, #connection_info, #core_version, #ip, #node, #port, #state, #sxl
Attributes included from Task
Attributes included from Distributor
Attributes included from Logging
Instance Method Summary collapse
- #acknowledged_first_ingoing(message) ⇒ Object
- #check_sxl_version(message) ⇒ Object
-
#connect ⇒ Object
connect to the supervisor and initiate handshake supervisor.
- #connect_tcp ⇒ Object
- #handshake_complete ⇒ Object
-
#initialize(options) ⇒ SupervisorProxy
constructor
A new instance of SupervisorProxy.
- #main ⇒ Object
- #process_message(message) ⇒ Object
- #process_version(message) ⇒ Object
- #reconnect_delay? ⇒ Boolean
-
#run ⇒ Object
handle communication if disconnected, then try to reconnect.
- #start_handshake ⇒ Object
- #stop_task ⇒ Object
- #sxl_version ⇒ Object
- #timer(now) ⇒ Object
- #version_accepted(message) ⇒ Object
Methods included from Modules::AggregatedStatus
#process_aggregated_status_request, #send_aggregated_status, #send_all_aggregated_status
Methods included from Modules::Alarms
#handle_alarm_acknowledge, #handle_alarm_request, #handle_alarm_resume, #handle_alarm_suspend, #process_alarm, #send_active_alarms, #send_alarm
Methods included from Modules::Commands
#build_command_rvs, #execute_commands, #process_command_request, #simplify_command_requests
Methods included from Modules::Status
#add_status_subscription, #build_status_list, #build_undefined_statuses, #check_on_change_update, #check_status_subscription, #fetch_last_sent_status, #fetch_status_values, #get_status_subscribe_interval, #interval_update_due?, #process_status_request, #process_status_subcribe, #process_status_unsubcribe, #remove_status_subscription, #rsmpify_value, #send_status_updates, #status_update_timer, #store_last_sent_status
Methods inherited from Proxy
#author, #clear, #clock, #close, #close_socket, #close_stream, #connected?, #disconnect, #disconnected?, #inspect, #log, #now, #ready?, #receive_error, #revive, #schemas, #setup, #state_changed, #stop_reader, #stop_subtasks, #stop_timer, version_meets_requirement?, #wait_for_reader
Methods included from Proxy::Modules::Tasks
#read_line, #run_reader, #run_timer, #start_reader, #start_timer
Methods included from Proxy::Modules::Versions
#check_core_version, #core_versions, #extraneous_version, #send_version, #version_acknowledged, version_meets_requirement?
Methods included from Proxy::Modules::Receive
#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle
Methods included from Proxy::Modules::Send
#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_and_optionally_collect, #send_message
Methods included from Proxy::Modules::Acknowledgements
#acknowledge, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged
Methods included from Proxy::Modules::Watchdogs
#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled
Methods included from Proxy::Modules::State
Methods included from Task
#initialize_task, #restart, #start, #stop, #stop_subtasks, #task_status, #wait, #wait_for_condition
Methods included from Inspect
Methods included from Distributor
#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #inspect, #remove_receiver, #with_deferred_distribution
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options) ⇒ SupervisorProxy
Returns a new instance of SupervisorProxy.
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 14 def initialize() super(.merge(node: [:site])) @site = [:site] @site_settings = @site.site_settings.clone @ip = [:ip] @port = [:port] @status_subscriptions = {} @sxl = @site_settings['sxl'] @synthetic_id = Supervisor.build_id_from_ip_port @ip, @port end |
Instance Attribute Details
#site ⇒ Object (readonly)
Returns the value of attribute site.
12 13 14 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12 def site @site end |
#supervisor_id ⇒ Object (readonly)
Returns the value of attribute supervisor_id.
12 13 14 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12 def supervisor_id @supervisor_id end |
Instance Method Details
#acknowledged_first_ingoing(message) ⇒ Object
129 130 131 132 133 134 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 129 def acknowledged_first_ingoing() case .type when 'Watchdog' handshake_complete end end |
#check_sxl_version(message) ⇒ Object
172 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 172 def check_sxl_version(); end |
#connect ⇒ Object
connect to the supervisor and initiate handshake supervisor
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 54 def connect log "Connecting to supervisor at #{@ip}:#{@port}", level: :info self.state = :connecting connect_tcp @logger.unmute @ip, @port log "Connected to supervisor at #{@ip}:#{@port}", level: :info rescue SystemCallError => e raise ConnectionError, "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}" rescue StandardError => e raise ConnectionError, "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}" end |
#connect_tcp ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 71 def connect_tcp @endpoint = IO::Endpoint.tcp(@ip, @port) # this timeout is a workaround for connect hanging on windows if the other side is not present yet timeout = @site_settings.dig('timeouts', 'connect') || 1.1 task.with_timeout timeout do @socket = @endpoint.connect end delay = @site_settings.dig('intervals', 'after_connect') task.sleep delay if delay @stream = IO::Stream::Buffered.new(@socket) @protocol = RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed self.state = :connected rescue Errno::ECONNREFUSED => e # rescue to avoid log output log 'Connection refused', level: :warning raise e end |
#handshake_complete ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 90 def handshake_complete sanitized_sxl_version = RSMP::Schema.sanitize_version(sxl_version) log "Connection to supervisor established, using core #{@core_version}, #{sxl} #{sanitized_sxl_version}", level: :info self.state = :ready start_watchdog if @site_settings['send_after_connect'] send_all_aggregated_status send_active_alarms end super end |
#main ⇒ Object
174 175 176 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 174 def main @site.main end |
#process_message(message) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 103 def () case when StatusResponse, StatusUpdate, AggregatedStatus, AlarmIssue will_not_handle when AggregatedStatusRequest process_aggregated_status_request when CommandRequest process_command_request when CommandResponse process_command_response when StatusRequest process_status_request when StatusSubscribe process_status_subcribe when StatusUnsubscribe process_status_unsubcribe when Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest process_alarm else super end rescue UnknownComponent, UnknownCommand, UnknownStatus, MessageRejected, MissingAttribute => e dont_acknowledge , '', e.to_s end |
#process_version(message) ⇒ Object
163 164 165 166 167 168 169 170 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 163 def process_version() return extraneous_version if @version_determined check_core_version check_sxl_version @site_id = Supervisor.build_id_from_ip_port @ip, @port version_accepted end |
#reconnect_delay? ⇒ Boolean
136 137 138 139 140 141 142 143 144 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 136 def reconnect_delay? return false if @site_settings['intervals']['reconnect'] == :no interval = @site_settings['intervals']['reconnect'] log "Will try to reconnect again every #{interval} seconds...", level: :info @logger.mute @ip, @port @task.sleep interval true end |
#run ⇒ Object
handle communication if disconnected, then try to reconnect
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 27 def run loop do connect start_reader start_handshake wait_for_reader # run until disconnected break unless reconnect_delay? rescue Restart @logger.mute @ip, @port raise rescue RSMP::ConnectionError => e log e, level: :error break unless reconnect_delay? rescue StandardError => e distribute_error e, level: :internal break unless reconnect_delay? ensure close stop_subtasks end end |
#start_handshake ⇒ Object
49 50 51 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 49 def start_handshake send_version @site_settings['site_id'], core_versions end |
#stop_task ⇒ Object
66 67 68 69 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 66 def stop_task super @last_status_sent = nil end |
#sxl_version ⇒ Object
159 160 161 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 159 def sxl_version @site_settings['sxl_version'].to_s end |
#timer(now) ⇒ Object
154 155 156 157 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 154 def timer(now) super status_update_timer now if ready? end |
#version_accepted(message) ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 146 def version_accepted() log "Received Version message, using RSMP #{@core_version}", message: , level: :log start_timer acknowledge @version_determined = true send_watchdog end |