Class: Fancybox2::Module::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/fancybox2/module/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fbxfile_path, options = {}) ⇒ Base

Returns a new instance of Base.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/fancybox2/module/base.rb', line 16

def initialize(fbxfile_path, options = {})
  unless fbxfile_path || fbxfile_path.is_a?(String) || fbxfile_path.empty?
    raise FbxfileNotProvided
  end

  @fbxfile_path = fbxfile_path
  options.deep_symbolize_keys!
  @internal_mqtt_client = false

  @fbxfile = check_and_return_fbxfile options.fetch(:fbxfile, load_fbx_file)
  @mqtt_client_params = options[:mqtt_client_params] || {}
  check_or_build_mqtt_client options[:mqtt_client]
  @log_level = options.fetch :log_level, ::Logger::INFO
  @log_progname = options.fetch :log_progname, 'Fancybox2::Module::Base'
  @logger = options.fetch :logger, create_default_logger
  @status = :stopped
  @alive_task = nil
  @configs = {}
end

Instance Attribute Details

#configsObject

Returns the value of attribute configs.



14
15
16
# File 'lib/fancybox2/module/base.rb', line 14

def configs
  @configs
end

#fbxfileObject (readonly)

Returns the value of attribute fbxfile.



13
14
15
# File 'lib/fancybox2/module/base.rb', line 13

def fbxfile
  @fbxfile
end

#fbxfile_pathObject (readonly)

Returns the value of attribute fbxfile_path.



13
14
15
# File 'lib/fancybox2/module/base.rb', line 13

def fbxfile_path
  @fbxfile_path
end

#loggerObject (readonly)

Returns the value of attribute logger.



13
14
15
# File 'lib/fancybox2/module/base.rb', line 13

def logger
  @logger
end

#mqtt_clientObject (readonly)

Returns the value of attribute mqtt_client.



13
14
15
# File 'lib/fancybox2/module/base.rb', line 13

def mqtt_client
  @mqtt_client
end

#statusObject (readonly)

Returns the value of attribute status.



13
14
15
# File 'lib/fancybox2/module/base.rb', line 13

def status
  @status
end

Instance Method Details

#alive_message_data(&block) ⇒ Object



36
37
38
39
40
41
42
# File 'lib/fancybox2/module/base.rb', line 36

def alive_message_data(&block)
  if block_given?
    @alive_message_data = block
    return
  end
  @alive_message_data.call if @alive_message_data
end

#alive_message_data=(callback) ⇒ Object



44
45
46
# File 'lib/fancybox2/module/base.rb', line 44

def alive_message_data=(callback)
  @alive_message_data = callback if callback.is_a?(Proc)
end

#message_to(dest, action = '', payload = '', retain = false, qos = 2) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fancybox2/module/base.rb', line 48

def message_to(dest, action = '', payload = '', retain = false, qos = 2)
  if mqtt_client.connected?
    topic = topic_for dest: dest, action: action
    payload = case payload
              when Hash, Array
                payload.to_json
              else
                payload
              end
    logger.debug "#{self.class}#message_to '#{topic}' payload: #{payload}"
    mqtt_client.publish topic, payload, retain, qos
  else
    logger.error 'MQTT client not connected to broker'
  end
end

#nameObject



64
65
66
# File 'lib/fancybox2/module/base.rb', line 64

def name
  fbxfile[:name]
end

#on_action(action, callback = nil, &block) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fancybox2/module/base.rb', line 68

def on_action(action, callback = nil, &block)
  topic = topic_for source: :core, action: action
  mqtt_client.add_topic_callback topic do |packet|
    # :nocov:
    payload = packet.payload
    # Try to parse payload as JSON. Rescue with original payload in case of error
    packet.payload = JSON.parse(payload) rescue payload
    if block_given?
      block.call packet
    elsif callback && callback.is_a?(Proc)
      callback.call packet
    end
    # :nocov:
  end
end

#on_client_connackObject

MQTT Client callbacks



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/fancybox2/module/base.rb', line 284

def on_client_connack
  logger.debug 'Connected to the broker'
  # Setup default callbacks
  default_actions.each do |action_name, callback|
    action_name = action_name.to_s

    on_action action_name do |packet|
      # :nocov:
      if callback.is_a? Proc
        callback.call packet
      else
        logger.warn "No valid callback defined for '#{action_name}'"
      end
      # :nocov:
    end
  end

  if mqtt_client.subscribed_topics.size.zero?
    # Subscribe to all messages directed to me
    logger.debug 'Making broker subscriptions'
    mqtt_client.subscribe [topic_for(source: '+', action: '+'), 2]
  end
end

#on_client_message(message) ⇒ Object

Note:

Call super if you override this method



337
338
# File 'lib/fancybox2/module/base.rb', line 337

def on_client_message(message)
end

#on_client_puback(message) ⇒ Object

Note:

Call super if you override this method



321
322
# File 'lib/fancybox2/module/base.rb', line 321

def on_client_puback(message)
end

#on_client_pubcomp(message) ⇒ Object

Note:

Call super if you override this method



333
334
# File 'lib/fancybox2/module/base.rb', line 333

def on_client_pubcomp(message)
end

#on_client_pubrec(message) ⇒ Object

Note:

Call super if you override this method



329
330
# File 'lib/fancybox2/module/base.rb', line 329

def on_client_pubrec(message)
end

#on_client_pubrel(message) ⇒ Object

Note:

Call super if you override this method



325
326
# File 'lib/fancybox2/module/base.rb', line 325

def on_client_pubrel(message)
end

#on_client_subackObject

Note:

Call super if you override this method



309
310
311
312
313
314
# File 'lib/fancybox2/module/base.rb', line 309

def on_client_suback
  # Client subscribed, we're ready to rock -> Tell core
  logger.debug 'Subscriptions done'
  logger.debug "Sending 'ready' to core"
  message_to :core, :ready
end

#on_client_unsubackObject

Note:

Call super if you override this method



317
318
# File 'lib/fancybox2/module/base.rb', line 317

def on_client_unsuback
end

#on_configs(packet = nil, &block) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/fancybox2/module/base.rb', line 84

def on_configs(packet = nil, &block)
  logger.debug 'on_configs'
  if block_given?
    @on_configs = block
    return
  end
  begin
    cfg = packet.payload
    if cfg && cfg.is_a?(Hash) && cfg['configs']
      self.configs.merge! cfg['configs']
    end
  rescue JSON::ParserError
    logger.debug 'on_configs: failed parsing packet as JSON'
  end

  @on_configs.call(packet) if @on_configs
end

#on_configs=(callback) ⇒ Object



102
103
104
# File 'lib/fancybox2/module/base.rb', line 102

def on_configs=(callback)
  @on_configs = callback if callback.is_a?(Proc)
end

#on_logger(packet = nil, &block) ⇒ Object



106
107
108
109
110
111
112
113
114
# File 'lib/fancybox2/module/base.rb', line 106

def on_logger(packet = nil, &block)
  if block_given?
    @on_logger = block
    return
  end
  @on_logger.call(packet) if @on_logger
  logger_configs = packet.payload
  logger.level = logger_configs['level'] if logger_configs['level']
end

#on_logger=(callback) ⇒ Object



116
117
118
# File 'lib/fancybox2/module/base.rb', line 116

def on_logger=(callback)
  @on_logger = callback if callback.is_a?(Proc)
end

#on_restart(packet = nil, &block) ⇒ Object



120
121
122
123
124
125
126
127
128
129
# File 'lib/fancybox2/module/base.rb', line 120

def on_restart(packet = nil, &block)
  if block_given?
    @on_restart = block
    return
  end
  @on_restart.call(packet) if @on_restart
  # Stop + start
  on_stop
  on_start packet
end

#on_restart=(callback) ⇒ Object



131
132
133
# File 'lib/fancybox2/module/base.rb', line 131

def on_restart=(callback)
  @on_restart = callback if callback.is_a?(Proc)
end

#on_shutdown(do_exit = true, &block) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/fancybox2/module/base.rb', line 135

def on_shutdown(do_exit = true, &block)
  if block_given?
    @on_shutdown = block
    return
  end

  @status = :on_shutdown

  shutdown_ok = true
  logger.debug "Received 'shutdown' command"
  # Stop sending alive messages
  @alive_task.shutdown if @alive_task

  begin
    # Call user code if any
    @on_shutdown.call if @on_shutdown
  rescue StandardError => e
    logger.error "Error during shutdown: #{e.message}"
    shutdown_ok = false
  end

  # Signal core that we've executed shutdown operations.
  # This message is not mandatory, so keep it simple
  shutdown_message = shutdown_ok ? 'ok' : 'nok'
  logger.debug "Sending shutdown message to core with status '#{shutdown_message}'"
  message_to :core, :shutdown, { status: shutdown_message }
  sleep 0.05 # Wait some time in order to be sure that the message has been published (message is not mandatory)

  Thread.new do
    if mqtt_client && mqtt_client.connected?
      # Gracefully disconnect from broker and exit
      logger.debug 'Disconnecting from broker, bye'
      mqtt_client.disconnect
      @mqtt_client = nil
    end

    if do_exit
      # Exit from process
      status_code = shutdown_ok ? 0 : 1
      logger.debug "Exiting with status code #{status_code}"
      exit status_code
    end
  end
end

#on_shutdown=(callback) ⇒ Object



180
181
182
# File 'lib/fancybox2/module/base.rb', line 180

def on_shutdown=(callback)
  @on_shutdown = callback if callback.is_a?(Proc)
end

#on_start(packet = nil, &block) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/fancybox2/module/base.rb', line 184

def on_start(packet = nil, &block)
  if block_given?
    @on_start = block
    return
  end
  # Call user code
  @on_start.call(packet) if @on_start

  cfg = packet ? packet.payload : {}
  interval = cfg['aliveTimeout'] || 1000
  # Start code execution from scratch
  logger.debug "Received 'start'"
  @status = :running
  start_sending_alive interval: interval
end

#on_start=(callback) ⇒ Object



200
201
202
# File 'lib/fancybox2/module/base.rb', line 200

def on_start=(callback)
  @on_start = callback if callback.is_a?(Proc)
end

#on_stop(&block) ⇒ Object



204
205
206
207
208
209
210
211
212
# File 'lib/fancybox2/module/base.rb', line 204

def on_stop(&block)
  if block_given?
    @on_stop = block
    return
  end
  @on_stop.call if @on_stop
  @status = :stopped
  # Stop code execution, but keep broker connection and continue to send alive
end

#on_stop=(callback) ⇒ Object



214
215
216
# File 'lib/fancybox2/module/base.rb', line 214

def on_stop=(callback)
  @on_stop = callback if callback.is_a?(Proc)
end

#remove_action(action) ⇒ Object



218
219
220
221
# File 'lib/fancybox2/module/base.rb', line 218

def remove_action(action)
  topic = topic_for source: :core, action: action
  mqtt_client.remove_topic_callback topic
end

#running?Boolean

Returns:

  • (Boolean)


249
250
251
# File 'lib/fancybox2/module/base.rb', line 249

def running?
  @status.eql? :running
end

#setup(retry_connection = true) ⇒ Object



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/fancybox2/module/base.rb', line 257

def setup(retry_connection = true)
  unless @setted_up
    begin
      logger.debug 'Connecting to the broker...'
      mqtt_client.connect
    rescue PahoMqtt::Exception => e
      # :nocov:
      logger.error "Error while connecting to the broker: #{e.message}"
      retry if retry_connection
      # :nocov:
    end

    @setted_up = true
  end
end

#shutdown(do_exit = true) ⇒ Object



223
224
225
# File 'lib/fancybox2/module/base.rb', line 223

def shutdown(do_exit = true)
  on_shutdown do_exit
end

#startObject



227
228
229
# File 'lib/fancybox2/module/base.rb', line 227

def start
  on_start
end

#start_sending_alive(interval: 5000) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/fancybox2/module/base.rb', line 231

def start_sending_alive(interval: 5000)
  # TODO: replace the alive interval task with Eventmachine?
  # Interval is expected to be msec, so convert it to secs
  interval /= 1000.0
  @alive_task.shutdown if @alive_task
  @alive_task = Concurrent::TimerTask.new(execution_interval: interval, timeout_interval: 2, run_now: true) do
    packet = { status: @status, lastSeen: Time.now.utc, data: nil }
    begin
      packet[:data] = alive_message_data
      message_to :core, :alive, packet
    rescue StandardError => e
      logger.error "Error in alive_message_data callback:  #{e.message}"
      logger.error e.backtrace.join "\n"
    end
  end
  @alive_task.execute
end

#stopped?Boolean

Returns:

  • (Boolean)


253
254
255
# File 'lib/fancybox2/module/base.rb', line 253

def stopped?
  @status.eql? :stopped
end

#topic_for(source: self.name, dest: self.name, action: nil, packet_type: :msg) ⇒ Object



273
274
275
276
277
278
279
280
# File 'lib/fancybox2/module/base.rb', line 273

def topic_for(source: self.name, dest: self.name, action: nil, packet_type: :msg)
  source = source.to_s
  packet_type = packet_type.to_s
  dest = dest.to_s
  action = action.to_s

  Config::DEFAULT_TOPIC_FORMAT % [source, packet_type, dest, action]
end