Class: MatrixCreator::Comm
- Inherits:
-
Object
- Object
- MatrixCreator::Comm
- Defined in:
- lib/matrix_creator/comm.rb
Overview
Class: Comm
This class is used to communicate to the chipset by using ZeroMQ sockets
Constant Summary collapse
- MATRIX_CREATOR_IP =
Contains the IP address to be used to connect to the Matrix Creator chipset
MatrixCreator.settings[:ip]
- MAX_OLD_FILES =
Maximum number of old log files to keep
10
- MAX_LOG_SIZE =
Maximum size for each log file
102_400_000
- LOG_LEVEL =
Current logger level to store
Logger::WARN
- PING_SPEED =
Pinging speed
3
- TIMEOUT_VERIFICATION_SPEED =
Speed to check for timeout
1
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Contains the ZMQ::Context instance used.
-
#device_port ⇒ Object
readonly
Contains device base port.
Instance Method Summary collapse
-
#destroy ⇒ Object
Destroy the ZMQ::Context instance, since there can only be one running per proccess.
-
#initialize(device_port) ⇒ Comm
constructor
Creates an instance of Comm to be used as communication with chipset’s device.
-
#perform(decoder, options = {}, block = nil) { ... } ⇒ Object
Start the listening proccess on a driver.
-
#send_configuration(driver_config) ⇒ Object
Sends configuration data to the driver.
-
#start_data_listener(decoder, max_resp, error_thread, block = nil) ⇒ Thread
Main thread that listens for data reported by the driver data port.
-
#start_error_listener ⇒ Thread
Connects to the error port to listen for any errors reported.
-
#start_pinging(main_thread) ⇒ Thread
Pings the driver keep-alive port every 3 seconds until listener finishes running.
-
#verify_timeout(max_secs, main_thread, error_thread, ping_thread) ⇒ Object
Verifies if there is a timeout according to the max number of seconds specified, if there is then all threads are killed.
Constructor Details
#initialize(device_port) ⇒ Comm
Creates an instance of Comm to be used as communication with chipset’s device
38 39 40 41 42 43 44 45 |
# File 'lib/matrix_creator/comm.rb', line 38 def initialize(device_port) initialize_logger # Creating instance variables print_log(:debug, 'Initializing instance') @context = ::ZMQ::Context.new @device_port = device_port end |
Instance Attribute Details
#context ⇒ Object (readonly)
Contains the ZMQ::Context instance used
29 30 31 |
# File 'lib/matrix_creator/comm.rb', line 29 def context @context end |
#device_port ⇒ Object (readonly)
Contains device base port
32 33 34 |
# File 'lib/matrix_creator/comm.rb', line 32 def device_port @device_port end |
Instance Method Details
#destroy ⇒ Object
Destroy the ZMQ::Context instance, since there can only be one running per proccess
240 241 242 243 |
# File 'lib/matrix_creator/comm.rb', line 240 def destroy print_log(:info, 'Destroying ZMQ context') @context.destroy end |
#perform(decoder, options = {}, block = nil) { ... } ⇒ Object
Start the listening proccess on a driver.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/matrix_creator/comm.rb', line 217 def perform(decoder, = {}, block = nil) # Start running threads error_thread = start_error_listener data_thread = start_data_listener(decoder, [:max_resp], error_thread, block) ping_thread = start_pinging(data_thread) # Verify timeout if that option is specified if [:max_secs] verify_timeout([:max_secs], data_thread, error_thread, ping_thread) end # Wait for threads to finish data_thread.join error_thread.join ping_thread.join # Return data captured from the driver print_log(:debug, "Data Result: #{data_thread[:result].to_json}") data_thread[:result] end |
#send_configuration(driver_config) ⇒ Object
Sends configuration data to the driver
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/matrix_creator/comm.rb', line 51 def send_configuration(driver_config) # Connecting to the configuration port socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port}" config_socket = @context.socket(:PUSH) config_socket.connect(socket_address) print_log(:debug, "config_socket connected to #{socket_address}") # Sending Encoded Data config_data = MatrixMalos::DriverConfig.encode(driver_config) config_socket.send(config_data) print_log(:info, 'Configuration sent to driver') print_log(:debug, "Data: #{driver_config.to_json}") end |
#start_data_listener(decoder, max_resp, error_thread, block = nil) ⇒ Thread
Main thread that listens for data reported by the driver data port. It will listen for any errors until the maximum number of messages expected to be received is reached or until it is killed by the timeout verification.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/matrix_creator/comm.rb', line 125 def start_data_listener(decoder, max_resp, error_thread, block = nil) Thread.new do # Initialize current number of messages received count = 0 begin # Thread variable that indicates if this thread has finished Thread.current[:finished] = false # Thread variable that contains an array of messages received # for further processing Thread.current[:result] = [] # Connecting to the data port socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 3}" data_socket = @context.socket(:SUB) data_socket.connect(socket_address) print_log(:debug, "data_socket connected to #{socket_address}") data_socket.subscribe('') print_log(:info, "Listening for data (max_resp: #{max_resp || 'Unlimited'})") loop do # Receiving data data = data_socket.recv print_log(:info, 'Data received') decoded_data = JSON.parse(decoder.decode(data).to_json, symbolize_names: true) print_log(:debug, "Data: #{decoded_data}") # Push decoded data into the results array Thread.current[:result] << decoded_data # Send data to callback method block.call(decoded_data) if block # Increment count and break loop if max number of # messages has been reached count += 1 break if max_resp && count >= max_resp end rescue => e print_log(:fatal, e.) end # Mark thread as finished Thread.current[:finished] = true print_log(:info, 'Finished listening') # Kill error thread, no longer need to log errors Thread.kill(error_thread) print_log(:info, 'Killed error listener thread') end end |
#start_error_listener ⇒ Thread
Connects to the error port to listen for any errors reported
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/matrix_creator/comm.rb', line 97 def start_error_listener Thread.new do # Connecting to the error port socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 2}" error_socket = @context.socket(:SUB) error_socket.connect(socket_address) error_socket.subscribe('') # Infinite loop to listen for errors, this thread will be killed # by the main thread when it needs to be stopped loop do # Read and log error messages error_msg = error_socket. print_log(:error, error_msg.data) end end end |
#start_pinging(main_thread) ⇒ Thread
Pings the driver keep-alive port every 3 seconds until listener finishes running
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/matrix_creator/comm.rb', line 70 def start_pinging(main_thread) Thread.new do # Connecting to the keep-alive port socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 1}" ping_socket = @context.socket(:PUSH) ping_socket.connect(socket_address) print_log(:debug, "ping_socket connected to #{socket_address}") # Infinite loop that breaks when main thread has finished loop do # Send Ping ping_socket.send('') print_log(:info, 'Ping sent') sleep(PING_SPEED) break if main_thread[:finished] end print_log(:debug, 'Stopped pinging') end end |
#verify_timeout(max_secs, main_thread, error_thread, ping_thread) ⇒ Object
Verifies if there is a timeout according to the max number of seconds specified, if there is then all threads are killed
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/matrix_creator/comm.rb', line 186 def verify_timeout(max_secs, main_thread, error_thread, ping_thread) current_time = Time.now print_log(:info, "Starting timeout verification (max_secs: #{max_secs})") loop do # Break if main thread is finished, we no longer need to check for timeout break if main_thread[:finished] # If there is a timeout, kill all threads and break if Time.now >= current_time + max_secs print_log(:info, 'Listener timed out, killing all threads') Thread.kill(main_thread) Thread.kill(error_thread) Thread.kill(ping_thread) break end sleep(TIMEOUT_VERIFICATION_SPEED) end print_log(:info, 'Finishing timeout verification') end |