Class: Connection
- Inherits:
-
Object
- Object
- Connection
- Defined in:
- lib/ulms_client.rb
Constant Summary collapse
- OPTIONS =
[:password, :clean_session, :keep_alive]
Instance Method Summary collapse
-
#connect ⇒ Object
Establish the connection.
-
#disconnect ⇒ Object
Disconnect from the broker.
-
#initialize(host:, port:, mode:, agent:, **kwargs) ⇒ Connection
constructor
A new instance of Connection.
-
#make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object
A high-level method that makes a request and waits for the response on it.
-
#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object
Publish a message to the ‘topic`.
-
#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object
Waits for an incoming message.
-
#subscribe(topic, qos: 0) ⇒ Object
Subscribe to the ‘topic`.
Constructor Details
#initialize(host:, port:, mode:, agent:, **kwargs) ⇒ Connection
Returns a new instance of Connection.
45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/ulms_client.rb', line 45 def initialize(host:, port:, mode:, agent:, **kwargs) @agent = agent @mqtt = MQTT::Client.new @mqtt.host = host @mqtt.port = port @mqtt.username = "v2::#{mode}" @mqtt.client_id = agent.to_s OPTIONS.each do |option| @mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil end end |
Instance Method Details
#connect ⇒ Object
Establish the connection.
60 61 62 63 |
# File 'lib/ulms_client.rb', line 60 def connect @mqtt.connect LOG.info("#{@agent} connected") end |
#disconnect ⇒ Object
Disconnect from the broker.
66 67 68 69 |
# File 'lib/ulms_client.rb', line 66 def disconnect @mqtt.disconnect LOG.info("#{@agent} disconnected") end |
#make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object
A high-level method that makes a request and waits for the response on it.
Options:
- `to`: the destination service `Account` (required).
- `payload`: the publish payload (required).
- `api_version`: service API version.
- `properties`: additional MQTT properties hash.
- `qos`: Publish QoS. An integer 0..2.
- `timeout`: Timeout for the response awaiting.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/ulms_client.rb', line 142 def make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) correlation_data = SecureRandom.hex properties.merge!({ type: 'request', method: method, correlation_data: correlation_data, response_topic: "agents/#{@agent}/api/#{api_version}/in/#{to}" }) topic = "agents/#{@agent}/api/#{api_version}/out/#{to}" publish(topic, payload: payload, properties: properties, qos: qos) receive(timeout) do |msg| msg.properties['type'] == 'response' && msg.properties['correlation_data'] == correlation_data end end |
#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object
Publish a message to the ‘topic`.
Options:
- `payload`: An object that will be dumped into JSON as the payload (required).
- `properties`: MQTT publish properties hash.
- `retain`: A boolean indicating whether the should be retained.
- `qos`: An integer 0..2 that sets the QoS.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/ulms_client.rb', line 78 def publish(topic, payload:, properties: {}, retain: false, qos: 0) envelope = { payload: JSON.dump(payload), properties: properties } @mqtt.publish(topic, JSON.dump(envelope), retain, qos) LOG.info " \#{@agent} published to \#{topic} (q\#{qos}, r\#{retain ? 1 : 0}):\n Payload: \#{JSON.pretty_generate(payload)}\n Properties: \#{JSON.pretty_generate(properties)}\n EOF\nend\n" |
#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object
Waits for an incoming message. If a block is given it passes the received message to the block. If the block returns falsey value it waits for the next one and so on. Returns the received message. Raises if ‘timeout` is over.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/ulms_client.rb', line 107 def receive(timeout=DEFAULT_TIMEOUT) Timeout::timeout(timeout, nil, "Timed out waiting for the message") do loop do topic, json = @mqtt.get envelope = JSON.load(json) payload = JSON.load(envelope['payload']) = IncomingMessage.new(topic, payload, envelope['properties']) LOG.info " \#{@agent} received a message from topic \#{topic}:\n Payload: \#{JSON.pretty_generate(message.payload)}\n Properties: \#{JSON.pretty_generate(message.properties)}\n EOF\n\n return message unless block_given?\n\n if yield(message)\n LOG.info \"The message matched the given predicate\"\n return message\n else\n LOG.info \"The message didn't match the given predicate. Waiting for the next one.\"\n end\n end\n end\nend\n" |
#subscribe(topic, qos: 0) ⇒ Object
Subscribe to the ‘topic`.
Options:
- `qos`: Subscriptions QoS. An interger 0..2.
97 98 99 100 |
# File 'lib/ulms_client.rb', line 97 def subscribe(topic, qos: 0) @mqtt.subscribe([topic, qos]) LOG.info("#{@agent} subscribed to #{topic} (q#{qos})") end |