Module: Nutella::App::Net

Defined in:
lib/nutella_lib/app_net.rb

Overview

This module implements the pub/sub and request/response APIs at the application level

Application-level communication APIs collapse

Application-level APIs to communicate at the run-level collapse

Application level APIs to communicate at the run-level (broadcast) collapse

Class Method Summary collapse

Class Method Details

.async_request(channel, message = nil, callback) ⇒ Object

Performs an asynchronous request at the application-level



58
59
60
# File 'lib/nutella_lib/app_net.rb', line 58

def self.async_request ( channel, message=nil, callback )
  Nutella::Net.async_request_to(channel, message, callback, Nutella.app_id, nil)
end

.async_request_to_all_runs(channel, request, callback) ⇒ Object

Allows application-level APIs to send a request to a run-level channel *for ALL runs*



216
217
218
219
220
# File 'lib/nutella_lib/app_net.rb', line 216

def self.async_request_to_all_runs(channel, request, callback)
  Nutella.app.app_runs_list.each do |run_id|
    Nutella::Net.async_request_to(channel, request, callback, Nutella.app_id, run_id)
  end
end

.async_request_to_run(run_id, channel, request, callback) ⇒ Object

Allows application-level APIs to make an asynchronous request to a run-level channel within a specific run



132
133
134
# File 'lib/nutella_lib/app_net.rb', line 132

def self.async_request_to_run( run_id, channel, request, callback)
  Nutella::Net.async_request_to(channel, request, callback, Nutella.app_id, run_id)
end

.handle_requests(channel, callback) ⇒ Object

Handles requests on a certain application-level channel



71
72
73
# File 'lib/nutella_lib/app_net.rb', line 71

def self.handle_requests( channel, callback )
  Nutella::Net.handle_requests_on(channel, callback, Nutella.app_id, nil)
end

.handle_requests_on_all_runs(channel, callback) ⇒ Object

Allows application-level APIs to handle requests to a run-level channel *for ALL runs*



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/nutella_lib/app_net.rb', line 232

def self.handle_requests_on_all_runs(channel, callback)
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 3 parameters (request, run_id, from) when handling requests!' if callback.parameters.length!=3
  # Pad channel
  padded_channel = Nutella::Net.pad_channel(channel, Nutella.app_id, '+')
  mqtt_cb = lambda do |request, mqtt_channel|
    begin
      # Extract nutella fields
      type, from, payload, id = Nutella::Net.extract_fields_from_message request
      run_id = extract_run_id_from_ch(Nutella.app_id, mqtt_channel)
      # Only handle requests that have proper id set
      return if type!='request' || id.nil?
      # Execute callback and send response
      m = Nutella::Net.prepare_message_for_response( callback.call( payload, run_id, from), id )
      Nutella.mqtt.publish( mqtt_channel, m )
    rescue JSON::ParserError
      # Make sure that request contains JSON, if not drop the message
      return
    rescue ArgumentError
      # Check the passed callback has the right number of arguments
      STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'request', 'run_id' and 'from'"
    end
  end
  # Subscribe to the channel
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Notify subscription
  Nutella::Net.publish_to('subscriptions', {'type' => 'handle_requests', 'channel' => padded_channel}, Nutella.app_id, nil)
end

.handle_requests_on_run(run_id, channel, callback) ⇒ Object

Allows application-level APIs to handle requests on a run-level channel within a specific run



146
147
148
# File 'lib/nutella_lib/app_net.rb', line 146

def self.handle_requests_on_run( run_id, channel, callback )
  Nutella::Net.handle_requests_on(channel, callback, Nutella.app_id, run_id)
end

.listenObject

Listens for incoming messages. All this function does is to put the thread to sleep and wait for something to happen over the network to wake up.



268
269
270
# File 'lib/nutella_lib/app_net.rb', line 268

def self.listen
  Nutella::Net.listen
end

.publish(channel, message = nil) ⇒ Object

Publishes a message to an application-level channel



37
38
39
# File 'lib/nutella_lib/app_net.rb', line 37

def self.publish(channel, message=nil)
  Nutella::Net.publish_to(channel, message, Nutella.app_id, nil)
end

.publish_to_all_runs(channel, message) ⇒ Object

Allows application-level APIs to publish a message to a run-level channel *for ALL runs*



203
204
205
206
207
# File 'lib/nutella_lib/app_net.rb', line 203

def self.publish_to_all_runs( channel, message )
  Nutella.app.app_runs_list.each do |run_id|
    Nutella::Net.publish_to(channel, message, Nutella.app_id, run_id)
  end
end

.publish_to_run(run_id, channel, message) ⇒ Object

Allows application-level APIs to publish to a run-level channel within a specific run



109
110
111
# File 'lib/nutella_lib/app_net.rb', line 109

def self.publish_to_run( run_id, channel, message )
  Nutella::Net.publish_to(channel, message, Nutella.app_id, run_id)
end

.subscribe(channel, callback) ⇒ Object

Subscribes to a channel or to a set of channels at the application-level.



19
20
21
# File 'lib/nutella_lib/app_net.rb', line 19

def self.subscribe (channel, callback)
  Nutella::Net.subscribe_to(channel, callback, Nutella.app_id, nil)
end

.subscribe_to_all_runs(channel, callback) ⇒ Object

Allows application-level APIs to subscribe to a run-level channel *for ALL runs*



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/nutella_lib/app_net.rb', line 163

def self.subscribe_to_all_runs( channel, callback )
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 3 parameters (payload, run_id, from) when subscribing to all runs!' if callback.parameters.length!=3
  # Pad channel
  padded_channel = Nutella::Net.pad_channel(channel, Nutella.app_id, '+')
  mqtt_cb = lambda do |mqtt_message, mqtt_channel|
    begin
      type, from, payload, _ = Nutella::Net.extract_fields_from_message mqtt_message
      run_id = extract_run_id_from_ch(Nutella.app_id, mqtt_channel)
      callback.call(payload, run_id, from) if type=='publish'
    rescue JSON::ParserError
      # Make sure the message is JSON, if not drop the message
      return
    rescue ArgumentError
      # Check the passed callback has the right number of arguments
      STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload', 'run_id' and 'from'"
    end
  end
  # Add to subscriptions, save mqtt callback and subscribe
  Nutella::Net.subscriptions.push padded_channel
  Nutella::Net.callbacks.push mqtt_cb
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Notify subscription
  Nutella::Net.publish_to('subscriptions', {'type' => 'subscribe', 'channel' => padded_channel}, Nutella.app_id, nil)
end

.subscribe_to_run(run_id, channel, callback) ⇒ Object

Allows application-level APIs to subscribe to a run-level channel within a specific run



89
90
91
# File 'lib/nutella_lib/app_net.rb', line 89

def self.subscribe_to_run( run_id, channel, callback )
  Nutella::Net.subscribe_to(channel, callback, Nutella.app_id, run_id)
end

.sync_request(channel, message = nil) ⇒ Object

Performs a synchronous request at the application-level



47
48
49
# File 'lib/nutella_lib/app_net.rb', line 47

def self.sync_request ( channel, message=nil )
  Nutella::Net.sync_request_to(channel, message, Nutella.app_id, nil)
end

.sync_request_to_run(run_id, channel, request) ⇒ Object

Allows application-level APIs to make a synchronous request to a run-level channel within a specific run



120
121
122
# File 'lib/nutella_lib/app_net.rb', line 120

def self.sync_request_to_run( run_id, channel, request)
  Nutella::Net.sync_request_to(channel, request, Nutella.app_id, run_id)
end

.unsubscribe(channel) ⇒ Object

Un-subscribes from an application-level channel



27
28
29
# File 'lib/nutella_lib/app_net.rb', line 27

def self.unsubscribe( channel )
  Nutella::Net.unsubscribe_to(channel, Nutella.app_id, nil)
end

.unsubscribe_from_all_runs(channel) ⇒ Object

Allows application-level APIs to unsubscribe from a run-level channel *for ALL runs*



193
194
195
# File 'lib/nutella_lib/app_net.rb', line 193

def self.unsubscribe_from_all_runs( channel )
  Nutella::Net.unsubscribe_to(channel, Nutella.app_id, '+')
end

.unsubscribe_to_run(run_id, channel) ⇒ Object

Allows application-level APIs to unsubscribe from a run-level channel within a specific run



98
99
100
# File 'lib/nutella_lib/app_net.rb', line 98

def self.unsubscribe_to_run( run_id, channel )
  Nutella::Net.unsubscribe_to(channel, Nutella.app_id, run_id)
end