Module: Nutella::App::Net

Defined in:
lib/nutella_lib/app_net.rb

Overview

This module implements the pub/sub and request/response APIs at the application level

Application-level communication APIs collapse

Application-level APIs to communicate at the run-level collapse

Application level APIs to communicate at the run-level (broadcast) collapse

Class Method Summary collapse

Class Method Details

.async_request(channel, message = nil, callback) ⇒ Object

Performs an asynchronous request at the application-level

Parameters:

  • channel (String)

    the application-level channel we want to make the request to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

  • callback (Proc)

    the callback that is fired whenever a response is received. It takes one parameter (response).



58
59
60
# File 'lib/nutella_lib/app_net.rb', line 58

def self.async_request ( channel, message=nil, callback )
  Nutella::Net.async_request_to(channel, message, callback, Nutella.app_id, nil)
end

.async_request_to_all_runs(channel, request, callback) ⇒ Object

Allows application-level APIs to send a request to a run-level channel *for ALL runs*

Parameters:

  • channel (String)

    the run-level channel we want to make the request to. CANNOT contain wildcard(s)!

  • request (Object)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

  • callback (Proc)

    the callback that is fired whenever a response is received. It takes one parameter (response).



216
217
218
219
220
# File 'lib/nutella_lib/app_net.rb', line 216

def self.async_request_to_all_runs(channel, request, callback)
  Nutella.app.app_runs_list.each do |run_id|
    Nutella::Net.async_request_to(channel, request, callback, Nutella.app_id, run_id)
  end
end

.async_request_to_run(run_id, channel, request, callback) ⇒ Object

Allows application-level APIs to make an asynchronous request to a run-level channel within a specific run

Parameters:

  • run_id (String)

    the specific run we are making the request to

  • channel (String)

    the channel we want to make the request to. CANNOT contain wildcard(s)!

  • request (Object)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

  • callback (Proc)

    the callback that is fired whenever a response is received. It takes one parameter (response).



132
133
134
# File 'lib/nutella_lib/app_net.rb', line 132

def self.async_request_to_run( run_id, channel, request, callback)
  Nutella::Net.async_request_to(channel, request, callback, Nutella.app_id, run_id)
end

.handle_requests(channel, callback) ⇒ Object

Handles requests on a certain application-level channel

Parameters:

  • channel (String)

    tha application-level channel we want to listen for requests on. Can contain wildcard(s).

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      the received message (payload). Messages that are not JSON are discarded.

    • Hash

      the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)

    • returns Hash

      The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.



71
72
73
# File 'lib/nutella_lib/app_net.rb', line 71

def self.handle_requests( channel, callback )
  Nutella::Net.handle_requests_on(channel, callback, Nutella.app_id, nil)
end

.handle_requests_on_all_runs(channel, callback) ⇒ Object

Allows application-level APIs to handle requests to a run-level channel *for ALL runs*

Parameters:

  • channel (String)

    tha run-level channel we want to listen for requests on. Can contain wildcard(s).

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      the received message (request). Messages that are not JSON are discarded.

    • String

      run_id: the run_id of the channel the message was sent on

    • Hash

      the sender’s identifiers (from containing, run_id, app_id, component_id and optionally resource_id)

    • returns Hash

      The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/nutella_lib/app_net.rb', line 232

def self.handle_requests_on_all_runs(channel, callback)
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 3 parameters (request, run_id, from) when handling requests!' if callback.parameters.length!=3
  # Pad channel
  padded_channel = Nutella::Net.pad_channel(channel, Nutella.app_id, '+')
  mqtt_cb = lambda do |request, mqtt_channel|
    begin
      # Extract nutella fields
      type, from, payload, id = Nutella::Net.extract_fields_from_message request
      run_id = extract_run_id_from_ch(Nutella.app_id, mqtt_channel)
      # Only handle requests that have proper id set
      return if type!='request' || id.nil?
      # Execute callback and send response
      m = Nutella::Net.prepare_message_for_response( callback.call( payload, run_id, from), id )
      Nutella.mqtt.publish( mqtt_channel, m )
    rescue JSON::ParserError
      # Make sure that request contains JSON, if not drop the message
      return
    rescue ArgumentError
      # Check the passed callback has the right number of arguments
      STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'request', 'run_id' and 'from'"
    end
  end
  # Subscribe to the channel
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Notify subscription
  Nutella::Net.publish_to('subscriptions', {'type' => 'handle_requests', 'channel' => padded_channel}, Nutella.app_id, nil)
end

.handle_requests_on_run(run_id, channel, callback) ⇒ Object

Allows application-level APIs to handle requests on a run-level channel within a specific run

Parameters:

  • run_id (String)

    the specific run requests are coming from

  • channel (String)

    we want to listen for requests on. Can contain wildcard(s).

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      the received message (payload). Messages that are not JSON are discarded.

    • Hash

      the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)

    • returns Hash

      The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.



146
147
148
# File 'lib/nutella_lib/app_net.rb', line 146

def self.handle_requests_on_run( run_id, channel, callback )
  Nutella::Net.handle_requests_on(channel, callback, Nutella.app_id, run_id)
end

.listenObject

Listens for incoming messages. All this function does is to put the thread to sleep and wait for something to happen over the network to wake up.



268
269
270
# File 'lib/nutella_lib/app_net.rb', line 268

def self.listen
  Nutella::Net.listen
end

.publish(channel, message = nil) ⇒ Object

Publishes a message to an application-level channel

Parameters:

  • channel (String)

    the application-level channel we want to publish the message to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the message we are publishing. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.



37
38
39
# File 'lib/nutella_lib/app_net.rb', line 37

def self.publish(channel, message=nil)
  Nutella::Net.publish_to(channel, message, Nutella.app_id, nil)
end

.publish_to_all_runs(channel, message) ⇒ Object

Allows application-level APIs to publish a message to a run-level channel *for ALL runs*

Parameters:

  • channel (String)

    the run-level channel we want to publish the message to. CANNOT contain wildcard(s)!

  • message (Object)

    the message we are publishing. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.



203
204
205
206
207
# File 'lib/nutella_lib/app_net.rb', line 203

def self.publish_to_all_runs( channel, message )
  Nutella.app.app_runs_list.each do |run_id|
    Nutella::Net.publish_to(channel, message, Nutella.app_id, run_id)
  end
end

.publish_to_run(run_id, channel, message) ⇒ Object

Allows application-level APIs to publish to a run-level channel within a specific run

Parameters:

  • run_id (String)

    the specific run we are publishing to

  • channel (String)

    the run-level channel we want to publish the message to. CANNOT contain wildcard(s)!

  • message (String)

    the message we are publishing. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.



109
110
111
# File 'lib/nutella_lib/app_net.rb', line 109

def self.publish_to_run( run_id, channel, message )
  Nutella::Net.publish_to(channel, message, Nutella.app_id, run_id)
end

.subscribe(channel, callback) ⇒ Object

Subscribes to a channel or to a set of channels at the application-level.

Parameters:

  • channel (String)

    the application-level channel or filter we are subscribing to. Can contain wildcard(s)

  • callback (Proc)

    a lambda expression that is fired whenever a message is received. The passed callback takes the following parameters:

    • String

      message: the received message. Messages that are not JSON are discarded.

    • String

      channel: the application-level channel the message was received on (optional, only for wildcard subscriptions)

    • Hash

      from: the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)



19
20
21
# File 'lib/nutella_lib/app_net.rb', line 19

def self.subscribe (channel, callback)
  Nutella::Net.subscribe_to(channel, callback, Nutella.app_id, nil)
end

.subscribe_to_all_runs(channel, callback) ⇒ Object

Allows application-level APIs to subscribe to a run-level channel *for ALL runs*

Parameters:

  • channel (String)

    the run-level channel we are subscribing to. Can be wildcard.

  • callback (Proc)

    the callback that is fired whenever a message is received on the channel. The passed callback takes the following parameters:

    • String

      message: the received message. Messages that are not JSON are discarded.

    • String

      run_id: the run_id of the channel the message was sent on

    • Hash

      from: the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/nutella_lib/app_net.rb', line 163

def self.subscribe_to_all_runs( channel, callback )
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 3 parameters (payload, run_id, from) when subscribing to all runs!' if callback.parameters.length!=3
  # Pad channel
  padded_channel = Nutella::Net.pad_channel(channel, Nutella.app_id, '+')
  mqtt_cb = lambda do |mqtt_message, mqtt_channel|
    begin
      type, from, payload, _ = Nutella::Net.extract_fields_from_message mqtt_message
      run_id = extract_run_id_from_ch(Nutella.app_id, mqtt_channel)
      callback.call(payload, run_id, from) if type=='publish'
    rescue JSON::ParserError
      # Make sure the message is JSON, if not drop the message
      return
    rescue ArgumentError
      # Check the passed callback has the right number of arguments
      STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload', 'run_id' and 'from'"
    end
  end
  # Add to subscriptions, save mqtt callback and subscribe
  Nutella::Net.subscriptions.push padded_channel
  Nutella::Net.callbacks.push mqtt_cb
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Notify subscription
  Nutella::Net.publish_to('subscriptions', {'type' => 'subscribe', 'channel' => padded_channel}, Nutella.app_id, nil)
end

.subscribe_to_run(run_id, channel, callback) ⇒ Object

Allows application-level APIs to subscribe to a run-level channel within a specific run

Parameters:

  • run_id (String)

    the specific run we are subscribing to

  • channel (String)

    the run-level channel we are subscribing to. Can be wildcard.

  • callback (Proc)

    the callback that is fired whenever a message is received on the channel. The passed callback takes the following parameters:

    • String

      message: the received message. Messages that are not JSON are discarded.

    • String

      channel: the application-level channel the message was received on (optional, only for wildcard subscriptions)

    • Hash

      from: the sender’s identifiers (run_id, app_id, component_id and optionally resource_id)



89
90
91
# File 'lib/nutella_lib/app_net.rb', line 89

def self.subscribe_to_run( run_id, channel, callback )
  Nutella::Net.subscribe_to(channel, callback, Nutella.app_id, run_id)
end

.sync_request(channel, message = nil) ⇒ Object

Performs a synchronous request at the application-level

Parameters:

  • channel (String)

    the application-level channel we want to make the request to. CANNOT contain wildcard(s)!

  • message (Object) (defaults to: nil)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.



47
48
49
# File 'lib/nutella_lib/app_net.rb', line 47

def self.sync_request ( channel, message=nil )
  Nutella::Net.sync_request_to(channel, message, Nutella.app_id, nil)
end

.sync_request_to_run(run_id, channel, request) ⇒ Object

Allows application-level APIs to make a synchronous request to a run-level channel within a specific run

Parameters:

  • run_id (String)

    the specific run we are making the request to

  • channel (String)

    the channel we want to make the request to. CANNOT contain wildcard(s)!

  • request (Object)

    the body of request. This can be, nil/empty (default), a string, a hash and, in general, anything with a .to_json method.



120
121
122
# File 'lib/nutella_lib/app_net.rb', line 120

def self.sync_request_to_run( run_id, channel, request)
  Nutella::Net.sync_request_to(channel, request, Nutella.app_id, run_id)
end

.unsubscribe(channel) ⇒ Object

Un-subscribes from an application-level channel

Parameters:

  • channel (String)

    the application level channel we want to unsubscribe from. Can contain wildcard(s).



27
28
29
# File 'lib/nutella_lib/app_net.rb', line 27

def self.unsubscribe( channel )
  Nutella::Net.unsubscribe_to(channel, Nutella.app_id, nil)
end

.unsubscribe_from_all_runs(channel) ⇒ Object

Allows application-level APIs to unsubscribe from a run-level channel *for ALL runs*

Parameters:

  • channel (String)

    the run-level channel we want to unsubscribe from. Can contain wildcard(s).



193
194
195
# File 'lib/nutella_lib/app_net.rb', line 193

def self.unsubscribe_from_all_runs( channel )
  Nutella::Net.unsubscribe_to(channel, Nutella.app_id, '+')
end

.unsubscribe_to_run(run_id, channel) ⇒ Object

Allows application-level APIs to unsubscribe from a run-level channel within a specific run

Parameters:

  • run_id (String)

    the specific run we are un-subscribing from

  • channel (String)

    the run-level channel we want to unsubscribe from. Can contain wildcard(s).



98
99
100
# File 'lib/nutella_lib/app_net.rb', line 98

def self.unsubscribe_to_run( run_id, channel )
  Nutella::Net.unsubscribe_to(channel, Nutella.app_id, run_id)
end