Module: Nutella::Net

Defined in:
lib/nutella_lib/net.rb

Overview

This module implements the pub/sub and request/response APIs at the run level

Class Method Summary collapse

Class Method Details

.assemble_fromObject



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/nutella_lib/net.rb', line 130

def self.assemble_from
  from = Hash.new
  if Nutella.run_id.nil?
    if Nutella.app_id.nil?
      from['type'] = 'framework'
    else
      from['type'] = 'app'
      from['app_id'] = Nutella.app_id
    end
  else
    from['type'] = 'run'
    from['run_id'] = Nutella.run_id
  end
  from['component_id'] = Nutella.component_id
  from['resource_id'] = Nutella.resource_id unless Nutella.resource_id.nil?
  from
end

.async_request(channel, message = nil, callback) ⇒ Object

Performs an asynchronous request

Parameters:

  • channel (String)

    we want to make the request to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

  • callback (Proc)

    the callback that is fired whenever a response is received. It takes one parameter (response).



64
65
66
# File 'lib/nutella_lib/net.rb', line 64

def self.async_request( channel, message=nil, callback )
  async_request_to(channel, message, callback, Nutella.app_id, Nutella.run_id)
end

.async_request_to(channel, message = nil, callback, app_id, run_id) ⇒ Object

Performs an asynchronous request

Parameters:

  • channel (String)

    we want to make the request to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

  • callback (Proc)

    the callback that is fired whenever a response is received. It takes one parameter (response).

  • app_id (String)

    used to pad channels

  • run_id (String)

    used to pad channels



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/nutella_lib/net.rb', line 311

def self.async_request_to( channel, message=nil, callback, app_id, run_id )
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 1 parameter (response) when making an asynchronous request!' if callback.parameters.length!=1
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Prepare message
  m, id = prepare_message_for_request message
  # Prepare callback
  mqtt_cb = lambda do |mqtt_message|
    type, _, payload, m_id = extract_fields_from_message mqtt_message
    if m_id==id && type=='response'
      callback.call(payload)
      Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb )
    end
  end
  # Subscribe
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Publish message
  Nutella.mqtt.publish( padded_channel, m )
end

.callbacksObject

Provides access to callbacks



14
# File 'lib/nutella_lib/net.rb', line 14

def self.callbacks; @callbacks end

.extract_fields_from_message(message) ⇒ Object



111
112
113
114
# File 'lib/nutella_lib/net.rb', line 111

def self.extract_fields_from_message(message)
  mh = JSON.parse message
  return mh['type'], mh['from'], mh['payload'], mh['id']
end

.handle_requests(channel, callback) ⇒ Object

Handles requests on a certain channel

Parameters:

  • channel (String)

    we want to listen for requests on. Can contain wildcard(s).

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      the received message (payload). Messages that are not JSON are discarded.

    • Hash

      the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)

    • returns Hash

      The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.



78
79
80
# File 'lib/nutella_lib/net.rb', line 78

def self.handle_requests( channel, callback )
  handle_requests_on(channel, callback, Nutella.app_id, Nutella.run_id)
end

.handle_requests_on(channel, callback, app_id, run_id) ⇒ Object

Handles requests on a certain channel

Parameters:

  • channel (String)

    we want to listen for requests on. Can contain wildcard(s).

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      the received message (payload). Messages that are not JSON are discarded.

    • Hash

      the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)

    • returns Hash

      The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.

  • app_id (String)

    used to pad channels

  • run_id (String)

    used to pad channels



343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/nutella_lib/net.rb', line 343

def self.handle_requests_on( channel, callback, app_id, run_id )
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 2 parameter (request, from) when handling requests!' if callback.parameters.length!=2
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  mqtt_cb = lambda do |request|
    begin
      # Extract nutella fields
      type, from, payload, id = extract_fields_from_message request
      # Only handle requests that have proper id set
      return if type!='request' || id.nil?
      # Execute callback and send response
      m = self.prepare_message_for_response( callback.call( payload, from), id )
      Nutella.mqtt.publish( padded_channel, m )
    rescue JSON::ParserError
      # Make sure that request contains JSON, if not drop the message
      return
    rescue ArgumentError
      # Check that the passed callback has the right number of arguments
      STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'"
    end
  end
  # Subscribe to the channel
  Nutella.mqtt.subscribe(padded_channel, mqtt_cb)
  # Notify subscription
  publish_to('subscriptions', {'type' => 'handle_requests', 'channel' => padded_channel}, app_id, run_id)
end

.listenObject

Listens for incoming messages. All this function does is to put the thread to sleep and wait for something to happen over the network to wake up.



99
100
101
102
103
104
105
# File 'lib/nutella_lib/net.rb', line 99

def self.listen
  begin
    sleep
  rescue Interrupt
    # Simply returns once interrupted
  end
end

.pad_channel(channel, app_id, run_id) ⇒ Object



116
117
118
119
120
121
# File 'lib/nutella_lib/net.rb', line 116

def self.pad_channel( channel, app_id, run_id )
  raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?)
  return "/nutella/#{channel}" if (app_id.nil? && run_id.nil?)
  return "/nutella/apps/#{app_id}/#{channel}" if (!app_id.nil? && run_id.nil?)
  "/nutella/apps/#{app_id}/runs/#{run_id}/#{channel}"
end

.prepare_message_for_publish(message) ⇒ Object



148
149
150
151
152
153
# File 'lib/nutella_lib/net.rb', line 148

def self.prepare_message_for_publish( message )
  if message.nil?
    return {type: 'publish', from: assemble_from}.to_json
  end
  {type: 'publish', from: assemble_from, payload: message}.to_json
end

.prepare_message_for_request(message) ⇒ Object



155
156
157
158
159
160
161
# File 'lib/nutella_lib/net.rb', line 155

def self.prepare_message_for_request( message )
  id = message.hash + Random.rand(100)
  if message.nil?
    return {id: id, type: 'request', from: assemble_from}.to_json, id
  end
  return {id: id, type: 'request', from: assemble_from, payload: message}.to_json, id
end

.prepare_message_for_response(message, id) ⇒ Object



163
164
165
166
167
168
# File 'lib/nutella_lib/net.rb', line 163

def self.prepare_message_for_response( message, id )
  if message.nil?
    return {id: id, type: 'response', from: assemble_from}.to_json
  end
  {id: id, type: 'response', from: assemble_from, payload: message}.to_json
end

.publish(channel, message = nil) ⇒ Object

Publishes a message to a channel

Parameters:

  • channel (String)

    we want to publish the message to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the message we are publishing. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.



43
44
45
# File 'lib/nutella_lib/net.rb', line 43

def self.publish( channel, message=nil )
  publish_to( channel, message, Nutella.app_id, Nutella.run_id)
end

.publish_to(channel, message = nil, app_id, run_id) ⇒ Object

Publishes a message to a channel

Parameters:

  • channel (String)

    we want to publish the message to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the message we are publishing. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

  • app_id (String)

    used to pad channels

  • run_id (String)

    used to pad channels



257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/nutella_lib/net.rb', line 257

def self.publish_to( channel, message=nil, app_id, run_id )
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Throw exception if trying to publish something that is not JSON
  begin
    m = self.prepare_message_for_publish message
    Nutella.mqtt.publish( padded_channel, m )
  rescue
    STDERR.puts 'Error: you are trying to publish something that is not JSON'
    STDERR.puts $!
  end
end

.start_pingingObject

Sends a ping every 5 seconds to pings channel of the proper level



86
87
88
89
90
91
92
93
# File 'lib/nutella_lib/net.rb', line 86

def self.start_pinging
  Nutella.ping_thread = Thread.new do
    loop do
      publish_to('pings', 'ping', Nutella.app_id, Nutella.run_id)
      sleep(5)
    end
  end
end

.subscribe(channel, callback) ⇒ Object

Subscribes to a channel or to a set of channels.

Parameters:

  • channel (String)

    the channel or filter we are subscribing to. Can contain wildcard(s)

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      message: the received message. Messages that are not JSON are discarded.

    • String

      channel: the channel the message was received on (optional, only for wildcard subscriptions)

    • Hash

      from: the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)



25
26
27
# File 'lib/nutella_lib/net.rb', line 25

def self.subscribe( channel, callback )
  subscribe_to( channel, callback, Nutella.app_id, Nutella.run_id)
end

.subscribe_to(channel, callback, app_id, run_id) ⇒ Object

Subscribes to a channel or to a set of channels.

Parameters:

  • channel (String)

    the channel or filter we are subscribing to. Can contain wildcard(s)

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      message: the received message. Messages that are not JSON are discarded.

    • String

      channel: the channel the message was received on (optional, only for wildcard subscriptions)

    • Hash

      from: the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)

  • app_id (String)

    used to pad channels

  • run_id (String)

    used to pad channels



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/nutella_lib/net.rb', line 181

def self.subscribe_to( channel, callback, app_id, run_id )
  # Check the passed callback has the right number of arguments
  if Nutella.mqtt.is_channel_wildcard?(channel)
    raise 'You need to pass a callback with 3 parameters (message, channel, from) when subscribing to a wildcard channel!' if callback.parameters.length!=3
  else
    raise 'You need to pass a callback with 2 parameters (message, from) when subscribing to a channel!' if callback.parameters.length!=2
  end
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Maintain unique subscriptions
  raise 'You can`t subscribe twice to the same channel`' if @subscriptions.include? padded_channel
  # Depending on what type of channel we are subscribing to (wildcard or simple)
  # register a different kind of callback
  if Nutella.mqtt.is_channel_wildcard?(padded_channel)
    mqtt_cb = lambda do |mqtt_message, mqtt_channel|
      begin
        type, from, payload, _ = extract_fields_from_message mqtt_message
        callback.call(payload, un_pad_channel(mqtt_channel, app_id, run_id), from) if type=='publish'
      rescue JSON::ParserError
        # Make sure the message is JSON, if not drop the message
        return
      rescue ArgumentError
        # Check the passed callback has the right number of arguments
        STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload', 'channel' and 'from'"
      end
    end
  else
    mqtt_cb = lambda do |message|
      begin
        type, from, payload, _ = extract_fields_from_message message
        callback.call(payload, from)  if type=='publish'
      rescue JSON::ParserError
        # Make sure the message is JSON, if not drop the message
        return
      rescue ArgumentError
        # Check the passed callback has the right number of arguments
        STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'"
      end
    end
  end
  # Add to subscriptions, save mqtt callback and subscribe
  @subscriptions.push padded_channel
  @callbacks.push mqtt_cb
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Notify subscription
  publish_to('subscriptions', {'type' => 'subscribe', 'channel' => padded_channel}, app_id, run_id)
end

.subscriptionsObject

Provides access to the subscriptions



11
# File 'lib/nutella_lib/net.rb', line 11

def self.subscriptions; @subscriptions end

.sync_request(channel, message = nil) ⇒ Object

Performs a synchronous request.

Parameters:

  • channel (String)

    we want to make the request to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.



53
54
55
# File 'lib/nutella_lib/net.rb', line 53

def self.sync_request( channel, message=nil )
  sync_request_to(channel, message, Nutella.app_id, Nutella.run_id)
end

.sync_request_to(channel, message = nil, app_id, run_id) ⇒ Object

Performs a synchronous request.

Parameters:

  • channel (String)

    we want to make the request to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

  • app_id (String)

    used to pad channels

  • run_id (String)

    used to pad channels



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/nutella_lib/net.rb', line 278

def self.sync_request_to( channel, message=nil, app_id, run_id )
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Prepare message
  m, id = prepare_message_for_request message
  # Initialize response
  response = nil
  # Prepare callback
  mqtt_cb = lambda do |mqtt_message|
    type, _, payload, m_id = extract_fields_from_message mqtt_message
    if m_id==id && type=='response'
      response = payload
      Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb )
    end
  end
  # Subscribe
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Publish message
  Nutella.mqtt.publish( padded_channel, m )
  # Wait for the response to come back
  sleep(0.1) while response.nil?
  response
end

.un_pad_channel(channel, app_id, run_id) ⇒ Object



123
124
125
126
127
128
# File 'lib/nutella_lib/net.rb', line 123

def self.un_pad_channel( channel, app_id, run_id )
  raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?)
  return channel.gsub('/nutella/', '') if (app_id.nil? && run_id.nil?)
  return channel.gsub("/nutella/apps/#{app_id}/", '') if (!app_id.nil? && run_id.nil?)
  channel.gsub("/nutella/apps/#{app_id}/runs/#{run_id}/", '')
end

.unsubscribe(channel) ⇒ Object

Un-subscribes from a channel

Parameters:

  • channel (String)

    we want to unsubscribe from. Can contain wildcard(s).



33
34
35
# File 'lib/nutella_lib/net.rb', line 33

def self.unsubscribe( channel )
  unsubscribe_to( channel, Nutella.app_id, Nutella.run_id)
end

.unsubscribe_to(channel, app_id, run_id) ⇒ Object

Un-subscribes from a channel

Parameters:

  • channel (String)

    we want to unsubscribe from. Can contain wildcard(s).

  • app_id (String)

    used to pad channels

  • run_id (String)

    used to pad channels



235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/nutella_lib/net.rb', line 235

def self.unsubscribe_to( channel, app_id, run_id )
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  idx = @subscriptions.index padded_channel
  # If we are not subscribed to this channel, return (no error is given)
  return if idx.nil?
  # Fetch the mqtt_callback associated with this channel/subscription
  mqtt_cb = @callbacks[idx]
  # Remove from subscriptions, callbacks and unsubscribe
  @subscriptions.delete_at idx
  @callbacks.delete_at idx
  Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb )
end