Module: Nutella::Net
- Defined in:
- lib/nutella_lib/net.rb
Overview
This module implements the pub/sub and request/response APIs at the run level
Class Method Summary collapse
- .assemble_from ⇒ Object
-
.async_request(channel, message = nil, callback) ⇒ Object
Performs an asynchronous request.
-
.async_request_to(channel, message = nil, callback, app_id, run_id) ⇒ Object
Performs an asynchronous request.
-
.callbacks ⇒ Object
Provides access to callbacks.
- .extract_fields_from_message(message) ⇒ Object
-
.handle_requests(channel, callback) ⇒ Object
Handles requests on a certain channel.
-
.handle_requests_on(channel, callback, app_id, run_id) ⇒ Object
Handles requests on a certain channel.
-
.listen ⇒ Object
Listens for incoming messages.
- .pad_channel(channel, app_id, run_id) ⇒ Object
- .prepare_message_for_publish(message) ⇒ Object
- .prepare_message_for_request(message) ⇒ Object
- .prepare_message_for_response(message, id) ⇒ Object
-
.publish(channel, message = nil) ⇒ Object
Publishes a message to a channel.
-
.publish_to(channel, message = nil, app_id, run_id) ⇒ Object
Publishes a message to a channel.
-
.start_pinging ⇒ Object
Sends a ping every 5 seconds to pings channel of the proper level.
-
.subscribe(channel, callback) ⇒ Object
Subscribes to a channel or to a set of channels.
-
.subscribe_to(channel, callback, app_id, run_id) ⇒ Object
Subscribes to a channel or to a set of channels.
-
.subscriptions ⇒ Object
Provides access to the subscriptions.
-
.sync_request(channel, message = nil) ⇒ Object
Performs a synchronous request.
-
.sync_request_to(channel, message = nil, app_id, run_id) ⇒ Object
Performs a synchronous request.
- .un_pad_channel(channel, app_id, run_id) ⇒ Object
-
.unsubscribe(channel) ⇒ Object
Un-subscribes from a channel.
-
.unsubscribe_to(channel, app_id, run_id) ⇒ Object
Un-subscribes from a channel.
Class Method Details
.assemble_from ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/nutella_lib/net.rb', line 130 def self.assemble_from from = Hash.new if Nutella.run_id.nil? if Nutella.app_id.nil? from['type'] = 'framework' else from['type'] = 'app' from['app_id'] = Nutella.app_id end else from['type'] = 'run' from['run_id'] = Nutella.run_id end from['component_id'] = Nutella.component_id from['resource_id'] = Nutella.resource_id unless Nutella.resource_id.nil? from end |
.async_request(channel, message = nil, callback) ⇒ Object
Performs an asynchronous request
64 65 66 |
# File 'lib/nutella_lib/net.rb', line 64 def self.async_request( channel, =nil, callback ) async_request_to(channel, , callback, Nutella.app_id, Nutella.run_id) end |
.async_request_to(channel, message = nil, callback, app_id, run_id) ⇒ Object
Performs an asynchronous request
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/nutella_lib/net.rb', line 311 def self.async_request_to( channel, =nil, callback, app_id, run_id ) # Check the passed callback has the right number of arguments raise 'You need to pass a callback with 1 parameter (response) when making an asynchronous request!' if callback.parameters.length!=1 # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Prepare message m, id = # Prepare callback mqtt_cb = lambda do || type, _, payload, m_id = if m_id==id && type=='response' callback.call(payload) Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb ) end end # Subscribe Nutella.mqtt.subscribe( padded_channel, mqtt_cb ) # Publish message Nutella.mqtt.publish( padded_channel, m ) end |
.callbacks ⇒ Object
Provides access to callbacks
14 |
# File 'lib/nutella_lib/net.rb', line 14 def self.callbacks; @callbacks end |
.extract_fields_from_message(message) ⇒ Object
111 112 113 114 |
# File 'lib/nutella_lib/net.rb', line 111 def self.() mh = JSON.parse return mh['type'], mh['from'], mh['payload'], mh['id'] end |
.handle_requests(channel, callback) ⇒ Object
Handles requests on a certain channel
78 79 80 |
# File 'lib/nutella_lib/net.rb', line 78 def self.handle_requests( channel, callback ) handle_requests_on(channel, callback, Nutella.app_id, Nutella.run_id) end |
.handle_requests_on(channel, callback, app_id, run_id) ⇒ Object
Handles requests on a certain channel
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/nutella_lib/net.rb', line 343 def self.handle_requests_on( channel, callback, app_id, run_id ) # Check the passed callback has the right number of arguments raise 'You need to pass a callback with 2 parameter (request, from) when handling requests!' if callback.parameters.length!=2 # Pad channel padded_channel = pad_channel(channel, app_id, run_id) mqtt_cb = lambda do |request| begin # Extract nutella fields type, from, payload, id = request # Only handle requests that have proper id set return if type!='request' || id.nil? # Execute callback and send response m = self.( callback.call( payload, from), id ) Nutella.mqtt.publish( padded_channel, m ) rescue JSON::ParserError # Make sure that request contains JSON, if not drop the message return rescue ArgumentError # Check that the passed callback has the right number of arguments STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'" end end # Subscribe to the channel Nutella.mqtt.subscribe(padded_channel, mqtt_cb) # Notify subscription publish_to('subscriptions', {'type' => 'handle_requests', 'channel' => padded_channel}, app_id, run_id) end |
.listen ⇒ Object
Listens for incoming messages. All this function does is to put the thread to sleep and wait for something to happen over the network to wake up.
99 100 101 102 103 104 105 |
# File 'lib/nutella_lib/net.rb', line 99 def self.listen begin sleep rescue Interrupt # Simply returns once interrupted end end |
.pad_channel(channel, app_id, run_id) ⇒ Object
116 117 118 119 120 121 |
# File 'lib/nutella_lib/net.rb', line 116 def self.pad_channel( channel, app_id, run_id ) raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?) return "/nutella/#{channel}" if (app_id.nil? && run_id.nil?) return "/nutella/apps/#{app_id}/#{channel}" if (!app_id.nil? && run_id.nil?) "/nutella/apps/#{app_id}/runs/#{run_id}/#{channel}" end |
.prepare_message_for_publish(message) ⇒ Object
148 149 150 151 152 153 |
# File 'lib/nutella_lib/net.rb', line 148 def self.( ) if .nil? return {type: 'publish', from: assemble_from}.to_json end {type: 'publish', from: assemble_from, payload: }.to_json end |
.prepare_message_for_request(message) ⇒ Object
155 156 157 158 159 160 161 |
# File 'lib/nutella_lib/net.rb', line 155 def self.( ) id = .hash + Random.rand(100) if .nil? return {id: id, type: 'request', from: assemble_from}.to_json, id end return {id: id, type: 'request', from: assemble_from, payload: }.to_json, id end |
.prepare_message_for_response(message, id) ⇒ Object
163 164 165 166 167 168 |
# File 'lib/nutella_lib/net.rb', line 163 def self.( , id ) if .nil? return {id: id, type: 'response', from: assemble_from}.to_json end {id: id, type: 'response', from: assemble_from, payload: }.to_json end |
.publish(channel, message = nil) ⇒ Object
Publishes a message to a channel
43 44 45 |
# File 'lib/nutella_lib/net.rb', line 43 def self.publish( channel, =nil ) publish_to( channel, , Nutella.app_id, Nutella.run_id) end |
.publish_to(channel, message = nil, app_id, run_id) ⇒ Object
Publishes a message to a channel
257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/nutella_lib/net.rb', line 257 def self.publish_to( channel, =nil, app_id, run_id ) # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Throw exception if trying to publish something that is not JSON begin m = self. Nutella.mqtt.publish( padded_channel, m ) rescue STDERR.puts 'Error: you are trying to publish something that is not JSON' STDERR.puts $! end end |
.start_pinging ⇒ Object
Sends a ping every 5 seconds to pings channel of the proper level
86 87 88 89 90 91 92 93 |
# File 'lib/nutella_lib/net.rb', line 86 def self.start_pinging Nutella.ping_thread = Thread.new do loop do publish_to('pings', 'ping', Nutella.app_id, Nutella.run_id) sleep(5) end end end |
.subscribe(channel, callback) ⇒ Object
Subscribes to a channel or to a set of channels.
25 26 27 |
# File 'lib/nutella_lib/net.rb', line 25 def self.subscribe( channel, callback ) subscribe_to( channel, callback, Nutella.app_id, Nutella.run_id) end |
.subscribe_to(channel, callback, app_id, run_id) ⇒ Object
Subscribes to a channel or to a set of channels.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/nutella_lib/net.rb', line 181 def self.subscribe_to( channel, callback, app_id, run_id ) # Check the passed callback has the right number of arguments if Nutella.mqtt.is_channel_wildcard?(channel) raise 'You need to pass a callback with 3 parameters (message, channel, from) when subscribing to a wildcard channel!' if callback.parameters.length!=3 else raise 'You need to pass a callback with 2 parameters (message, from) when subscribing to a channel!' if callback.parameters.length!=2 end # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Maintain unique subscriptions raise 'You can`t subscribe twice to the same channel`' if @subscriptions.include? padded_channel # Depending on what type of channel we are subscribing to (wildcard or simple) # register a different kind of callback if Nutella.mqtt.is_channel_wildcard?(padded_channel) mqtt_cb = lambda do |, mqtt_channel| begin type, from, payload, _ = callback.call(payload, un_pad_channel(mqtt_channel, app_id, run_id), from) if type=='publish' rescue JSON::ParserError # Make sure the message is JSON, if not drop the message return rescue ArgumentError # Check the passed callback has the right number of arguments STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload', 'channel' and 'from'" end end else mqtt_cb = lambda do || begin type, from, payload, _ = callback.call(payload, from) if type=='publish' rescue JSON::ParserError # Make sure the message is JSON, if not drop the message return rescue ArgumentError # Check the passed callback has the right number of arguments STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'" end end end # Add to subscriptions, save mqtt callback and subscribe @subscriptions.push padded_channel @callbacks.push mqtt_cb Nutella.mqtt.subscribe( padded_channel, mqtt_cb ) # Notify subscription publish_to('subscriptions', {'type' => 'subscribe', 'channel' => padded_channel}, app_id, run_id) end |
.subscriptions ⇒ Object
Provides access to the subscriptions
11 |
# File 'lib/nutella_lib/net.rb', line 11 def self.subscriptions; @subscriptions end |
.sync_request(channel, message = nil) ⇒ Object
Performs a synchronous request.
53 54 55 |
# File 'lib/nutella_lib/net.rb', line 53 def self.sync_request( channel, =nil ) sync_request_to(channel, , Nutella.app_id, Nutella.run_id) end |
.sync_request_to(channel, message = nil, app_id, run_id) ⇒ Object
Performs a synchronous request.
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/nutella_lib/net.rb', line 278 def self.sync_request_to( channel, =nil, app_id, run_id ) # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Prepare message m, id = # Initialize response response = nil # Prepare callback mqtt_cb = lambda do || type, _, payload, m_id = if m_id==id && type=='response' response = payload Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb ) end end # Subscribe Nutella.mqtt.subscribe( padded_channel, mqtt_cb ) # Publish message Nutella.mqtt.publish( padded_channel, m ) # Wait for the response to come back sleep(0.1) while response.nil? response end |
.un_pad_channel(channel, app_id, run_id) ⇒ Object
123 124 125 126 127 128 |
# File 'lib/nutella_lib/net.rb', line 123 def self.un_pad_channel( channel, app_id, run_id ) raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?) return channel.gsub('/nutella/', '') if (app_id.nil? && run_id.nil?) return channel.gsub("/nutella/apps/#{app_id}/", '') if (!app_id.nil? && run_id.nil?) channel.gsub("/nutella/apps/#{app_id}/runs/#{run_id}/", '') end |
.unsubscribe(channel) ⇒ Object
Un-subscribes from a channel
33 34 35 |
# File 'lib/nutella_lib/net.rb', line 33 def self.unsubscribe( channel ) unsubscribe_to( channel, Nutella.app_id, Nutella.run_id) end |
.unsubscribe_to(channel, app_id, run_id) ⇒ Object
Un-subscribes from a channel
235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/nutella_lib/net.rb', line 235 def self.unsubscribe_to( channel, app_id, run_id ) # Pad channel padded_channel = pad_channel(channel, app_id, run_id) idx = @subscriptions.index padded_channel # If we are not subscribed to this channel, return (no error is given) return if idx.nil? # Fetch the mqtt_callback associated with this channel/subscription mqtt_cb = @callbacks[idx] # Remove from subscriptions, callbacks and unsubscribe @subscriptions.delete_at idx @callbacks.delete_at idx Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb ) end |