Class: RSMP::Proxy

Inherits:
Object
  • Object
show all
Includes:
Distributor, Inspect, Logging, Modules::Acknowledgements, Modules::Receive, Modules::Send, Modules::State, Modules::Tasks, Modules::Versions, Modules::Watchdogs, Task
Defined in:
lib/rsmp/proxy/proxy.rb,
lib/rsmp/proxy/modules/send.rb,
lib/rsmp/proxy/modules/state.rb,
lib/rsmp/proxy/modules/tasks.rb,
lib/rsmp/proxy/modules/receive.rb,
lib/rsmp/proxy/modules/versions.rb,
lib/rsmp/proxy/modules/watchdogs.rb,
lib/rsmp/proxy/modules/acknowledgements.rb

Direct Known Subclasses

SiteProxy, SupervisorProxy

Defined Under Namespace

Modules: Modules

Constant Summary collapse

WRAPPING_DELIMITER =
"\f".freeze

Instance Attribute Summary collapse

Attributes included from Task

#task

Attributes included from Distributor

#receivers

Attributes included from Logging

#logger

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Modules::Tasks

#read_line, #run_reader, #run_timer, #start_reader, #start_timer, #timer

Methods included from Modules::Versions

#check_core_version, #core_versions, #extraneous_version, #process_version, #send_version, #version_acknowledged

Methods included from Modules::Receive

#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_message, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle

Methods included from Modules::Send

#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_and_optionally_collect, #send_message

Methods included from Modules::Acknowledgements

#acknowledge, #acknowledged_first_ingoing, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged

Methods included from Modules::Watchdogs

#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled

Methods included from Modules::State

#handshake_complete, #wait_for_state

Methods included from Task

#initialize_task, #restart, #run, #start, #stop, #task_status, #wait, #wait_for_condition

Methods included from Inspect

#inspector

Methods included from Distributor

#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #remove_receiver, #with_deferred_distribution

Methods included from Logging

#initialize_logging

Constructor Details

#initialize(options) ⇒ Proxy

Returns a new instance of Proxy.



25
26
27
28
29
30
31
32
33
34
# File 'lib/rsmp/proxy/proxy.rb', line 25

def initialize(options)
  @node = options[:node]
  initialize_logging options
  initialize_distributor
  initialize_task
  setup options
  clear
  @state = :disconnected
  @state_condition = Async::Notification.new
end

Instance Attribute Details

#archiveObject (readonly)

Returns the value of attribute archive.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def archive
  @archive
end

#collectorObject (readonly)

Returns the value of attribute collector.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def collector
  @collector
end

#connection_infoObject (readonly)

Returns the value of attribute connection_info.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def connection_info
  @connection_info
end

#core_versionObject (readonly)

Returns the value of attribute core_version.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def core_version
  @core_version
end

#ipObject (readonly)

Returns the value of attribute ip.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def ip
  @ip
end

#nodeObject (readonly)

Returns the value of attribute node.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def node
  @node
end

#portObject (readonly)

Returns the value of attribute port.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def port
  @port
end

#stateObject

Returns the value of attribute state.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def state
  @state
end

#sxlObject (readonly)

Returns the value of attribute sxl.



23
24
25
# File 'lib/rsmp/proxy/proxy.rb', line 23

def sxl
  @sxl
end

Class Method Details

.version_meets_requirement?(version, requirement) ⇒ Boolean

Use Gem class to check version requirement Requirement must be a string like ‘1.1’, ‘>=1.0.3’ or ‘<2.1.4’, or list of strings, like [‘<=1.4’,‘<1.5’]

Returns:

  • (Boolean)


195
196
197
# File 'lib/rsmp/proxy/proxy.rb', line 195

def self.version_meets_requirement?(version, requirement)
  Modules::Versions.version_meets_requirement?(version, requirement)
end

Instance Method Details

#authorObject



188
189
190
# File 'lib/rsmp/proxy/proxy.rb', line 188

def author
  @node.site_id
end

#clearObject



129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/rsmp/proxy/proxy.rb', line 129

def clear
  @awaiting_acknowledgement = {}
  @latest_watchdog_received = nil
  @watchdog_started = false
  @version_determined = false
  @ingoing_acknowledged = {}
  @outgoing_acknowledged = {}
  @latest_watchdog_send_at = nil

  @acknowledgements = {}
  @acknowledgement_condition = Async::Notification.new
end

#clockObject



169
170
171
# File 'lib/rsmp/proxy/proxy.rb', line 169

def clock
  @node.clock
end

#closeObject

close connection, but keep our main task running so we can reconnect



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rsmp/proxy/proxy.rb', line 51

def close
  log 'Closing connection', level: :warning
  close_stream
  close_socket
  stop_reader
  self.state = :disconnected
  distribute_error DisconnectError.new('Connection was closed')

  # stop timer
  # as we're running inside the timer, code after stop_timer() will not be called,
  # unless it's in the ensure block
  stop_timer
end

#close_socketObject



90
91
92
93
94
# File 'lib/rsmp/proxy/proxy.rb', line 90

def close_socket
  @socket&.close
ensure
  @socket = nil
end

#close_streamObject



84
85
86
87
88
# File 'lib/rsmp/proxy/proxy.rb', line 84

def close_stream
  @stream&.close
ensure
  @stream = nil
end

#connected?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/rsmp/proxy/proxy.rb', line 107

def connected?
  @state == :connected || @state == :ready
end

#disconnectObject

Connection lifecycle methods



42
# File 'lib/rsmp/proxy/proxy.rb', line 42

def disconnect; end

#disconnected?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/rsmp/proxy/proxy.rb', line 111

def disconnected?
  @state == :disconnected
end

#inspectObject



163
164
165
166
167
# File 'lib/rsmp/proxy/proxy.rb', line 163

def inspect
  "#<#{self.class.name}:#{object_id}, #{inspector(
    :@acknowledgements, :@settings, :@site_settings
  )}>"
end

#log(str, options = {}) ⇒ Object



177
178
179
# File 'lib/rsmp/proxy/proxy.rb', line 177

def log(str, options = {})
  super(str, options.merge(ip: @ip, port: @port, site_id: @site_id))
end

#nowObject



36
37
38
# File 'lib/rsmp/proxy/proxy.rb', line 36

def now
  node.now
end

#ready?Boolean

State management methods

Returns:

  • (Boolean)


103
104
105
# File 'lib/rsmp/proxy/proxy.rb', line 103

def ready?
  @state == :ready
end

#receive_error(error, options = {}) ⇒ Object



173
174
175
# File 'lib/rsmp/proxy/proxy.rb', line 173

def receive_error(error, options = {})
  @node.receive_error error, options
end

#revive(options) ⇒ Object

revive after a reconnect



143
144
145
# File 'lib/rsmp/proxy/proxy.rb', line 143

def revive(options)
  setup options
end

#schemasObject



181
182
183
184
185
186
# File 'lib/rsmp/proxy/proxy.rb', line 181

def schemas
  schemas = { core: RSMP::Schema.latest_core_version } # use latest core
  schemas[:core] = core_version if core_version
  schemas[sxl] = RSMP::Schema.sanitize_version(sxl_version.to_s) if sxl && sxl_version
  schemas
end

#setup(options) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/rsmp/proxy/proxy.rb', line 147

def setup(options)
  @settings = options[:settings]
  @socket = options[:socket]
  @stream = options[:stream]
  @protocol = options[:protocol]
  @ip = options[:ip]
  @port = options[:port]
  @connection_info = options[:info]
  @sxl = nil
  @site_settings = nil # can't pick until we know the site id
  return unless options[:collect]

  @collector = RSMP::Collector.new self, options[:collect]
  @collector.start
end

#state_changedObject

the state changed override to to things like notifications



125
126
127
# File 'lib/rsmp/proxy/proxy.rb', line 125

def state_changed
  @state_condition.signal @state
end

#stop_readerObject



78
79
80
81
82
# File 'lib/rsmp/proxy/proxy.rb', line 78

def stop_reader
  @reader&.stop
ensure
  @reader = nil
end

#stop_subtasksObject



65
66
67
68
69
70
# File 'lib/rsmp/proxy/proxy.rb', line 65

def stop_subtasks
  stop_timer
  stop_reader
  clear
  super
end

#stop_taskObject



96
97
98
99
# File 'lib/rsmp/proxy/proxy.rb', line 96

def stop_task
  close
  super
end

#stop_timerObject



72
73
74
75
76
# File 'lib/rsmp/proxy/proxy.rb', line 72

def stop_timer
  @timer&.stop
ensure
  @timer = nil
end

#wait_for_readerObject

wait for the reader task to complete, which is not expected to happen before the connection is closed



46
47
48
# File 'lib/rsmp/proxy/proxy.rb', line 46

def wait_for_reader
  @reader&.wait
end