Class: MatrixCreator::Comm

Inherits:
Object
  • Object
show all
Defined in:
lib/matrix_creator/comm.rb

Overview

Class: Comm

This class is used to communicate to the chipset by using ZeroMQ sockets

Constant Summary collapse

MATRIX_CREATOR_IP =

Contains the IP address to be used to connect to the Matrix Creator chipset

MatrixCreator.settings[:ip]
MAX_OLD_FILES =

Maximum number of old log files to keep

10
MAX_LOG_SIZE =

Maximum size for each log file

102_400_000
LOG_LEVEL =

Current logger level to store

Logger::WARN
PING_SPEED =

Pinging speed

3
TIMEOUT_VERIFICATION_SPEED =

Speed to check for timeout

1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(device_port) ⇒ Comm

Creates an instance of Comm to be used as communication with chipset’s device

Parameters:

  • device_port (Integer)

    port of the device to communicate with



38
39
40
41
42
43
44
45
# File 'lib/matrix_creator/comm.rb', line 38

def initialize(device_port)
  initialize_logger

  # Creating instance variables
  print_log(:debug, 'Initializing instance')
  @context = ::ZMQ::Context.new
  @device_port = device_port
end

Instance Attribute Details

#contextObject (readonly)

Contains the ZMQ::Context instance used



29
30
31
# File 'lib/matrix_creator/comm.rb', line 29

def context
  @context
end

#device_portObject (readonly)

Contains device base port



32
33
34
# File 'lib/matrix_creator/comm.rb', line 32

def device_port
  @device_port
end

Instance Method Details

#destroyObject

Destroy the ZMQ::Context instance, since there can only be one running per proccess



240
241
242
243
# File 'lib/matrix_creator/comm.rb', line 240

def destroy
  print_log(:info, 'Destroying ZMQ context')
  @context.destroy
end

#perform(decoder, options = {}, block = nil) { ... } ⇒ Object

Start the listening proccess on a driver.

Parameters:

  • decoder (MatrixMalos)

    module to be used to decode data received

  • options (Hash) (defaults to: {})

    contains the options that can be specified for a max_resp and/or max_secs

Yields:

  • callback used to process data received from the driver

Returns:

  • an array with a list of all the messages received



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/matrix_creator/comm.rb', line 217

def perform(decoder, options = {}, block = nil)
  # Start running threads
  error_thread = start_error_listener
  data_thread = start_data_listener(decoder, options[:max_resp], error_thread, block)
  ping_thread = start_pinging(data_thread)

  # Verify timeout if that option is specified
  if options[:max_secs]
    verify_timeout(options[:max_secs], data_thread, error_thread, ping_thread)
  end

  # Wait for threads to finish
  data_thread.join
  error_thread.join
  ping_thread.join

  # Return data captured from the driver
  print_log(:debug, "Data Result: #{data_thread[:result].to_json}")
  data_thread[:result]
end

#send_configuration(driver_config) ⇒ Object

Sends configuration data to the driver

Parameters:



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/matrix_creator/comm.rb', line 51

def send_configuration(driver_config)
  # Connecting to the configuration port
  socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port}"
  config_socket = @context.socket(:PUSH)
  config_socket.connect(socket_address)
  print_log(:debug, "config_socket connected to #{socket_address}")

  # Sending Encoded Data
  config_data = MatrixMalos::DriverConfig.encode(driver_config)
  config_socket.send(config_data)
  print_log(:info, 'Configuration sent to driver')
  print_log(:debug, "Data: #{driver_config.to_json}")
end

#start_data_listener(decoder, max_resp, error_thread, block = nil) ⇒ Thread

Main thread that listens for data reported by the driver data port. It will listen for any errors until the maximum number of messages expected to be received is reached or until it is killed by the timeout verification.

Parameters:

  • decoder (MatrixMalos)

    module to be used to decode data received

  • max_resp (Integer)

    maximum number of messages to receive

  • error_thread (Thread)

    instance that logs errors

  • block (defaults to: nil)

    callback method to be executed when a message has been received

Returns:

  • (Thread)

    instance



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/matrix_creator/comm.rb', line 125

def start_data_listener(decoder, max_resp, error_thread, block = nil)
  Thread.new do
    # Initialize current number of messages received
    count = 0

    begin
      # Thread variable that indicates if this thread has finished
      Thread.current[:finished] = false

      # Thread variable that contains an array of messages received
      # for further processing
      Thread.current[:result] = []

      # Connecting to the data port
      socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 3}"
      data_socket = @context.socket(:SUB)
      data_socket.connect(socket_address)
      print_log(:debug, "data_socket connected to #{socket_address}")
      data_socket.subscribe('')
      print_log(:info, "Listening for data (max_resp: #{max_resp || 'Unlimited'})")

      loop do
        # Receiving data
        data = data_socket.recv
        print_log(:info, 'Data received')
        decoded_data = JSON.parse(decoder.decode(data).to_json, symbolize_names: true)
        print_log(:debug, "Data: #{decoded_data}")

        # Push decoded data into the results array
        Thread.current[:result] << decoded_data

        # Send data to callback method
        block.call(decoded_data) if block

        # Increment count and break loop if max number of
        # messages has been reached
        count += 1
        break if max_resp && count >= max_resp
      end
    rescue => e
      print_log(:fatal, e.message)
    end

    # Mark thread as finished
    Thread.current[:finished] = true
    print_log(:info, 'Finished listening')

    # Kill error thread, no longer need to log errors
    Thread.kill(error_thread)
    print_log(:info, 'Killed error listener thread')
  end
end

#start_error_listenerThread

Connects to the error port to listen for any errors reported

Returns:

  • (Thread)

    instance



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/matrix_creator/comm.rb', line 97

def start_error_listener
  Thread.new do
    # Connecting to the error port
    socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 2}"
    error_socket = @context.socket(:SUB)
    error_socket.connect(socket_address)
    error_socket.subscribe('')

    # Infinite loop to listen for errors, this thread will be killed
    # by the main thread when it needs to be stopped
    loop do
      # Read and log error messages
      error_msg = error_socket.recv_message
      print_log(:error, error_msg.data)
    end
  end
end

#start_pinging(main_thread) ⇒ Thread

Pings the driver keep-alive port every 3 seconds until listener finishes running

Parameters:

  • main_thread (Thread)

    the main listener

Returns:

  • (Thread)

    instance



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/matrix_creator/comm.rb', line 70

def start_pinging(main_thread)
  Thread.new do
    # Connecting to the keep-alive port
    socket_address = "tcp://#{MATRIX_CREATOR_IP}:#{@device_port + 1}"
    ping_socket = @context.socket(:PUSH)
    ping_socket.connect(socket_address)
    print_log(:debug, "ping_socket connected to #{socket_address}")

    # Infinite loop that breaks when main thread has finished
    loop do
      # Send Ping
      ping_socket.send('')
      print_log(:info, 'Ping sent')

      sleep(PING_SPEED)

      break if main_thread[:finished]
    end

    print_log(:debug, 'Stopped pinging')
  end
end

#verify_timeout(max_secs, main_thread, error_thread, ping_thread) ⇒ Object

Verifies if there is a timeout according to the max number of seconds specified, if there is then all threads are killed

Parameters:

  • max_secs (Integer)

    maximum number of seconds to gather data

  • main_thread (Thread)

    instance of the main data listener

  • error_thread (Thread)

    instance of the error listener

  • ping_thread (Thread)

    instance of the ping thread



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/matrix_creator/comm.rb', line 186

def verify_timeout(max_secs, main_thread, error_thread, ping_thread)
  current_time = Time.now

  print_log(:info, "Starting timeout verification (max_secs: #{max_secs})")

  loop do
    # Break if main thread is finished, we no longer need to check for timeout
    break if main_thread[:finished]

    # If there is a timeout, kill all threads and break
    if Time.now >= current_time + max_secs
      print_log(:info, 'Listener timed out, killing all threads')
      Thread.kill(main_thread)
      Thread.kill(error_thread)
      Thread.kill(ping_thread)
      break
    end

    sleep(TIMEOUT_VERIFICATION_SPEED)
  end

  print_log(:info, 'Finishing timeout verification')
end