Class: Connection
- Inherits:
-
Object
- Object
- Connection
- Defined in:
- lib/ulms_client.rb
Constant Summary collapse
- OPTIONS =
[:username, :password, :clean_session, :keep_alive]
Instance Method Summary collapse
-
#connect ⇒ Object
Establish the connection.
-
#disconnect ⇒ Object
Disconnect from the broker.
-
#initialize(host:, port:, client:, **kwargs) ⇒ Connection
constructor
A new instance of Connection.
-
#make_request(method, to:, payload:, properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object
A high-level method that makes a request and waits for the response on it.
-
#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object
Publish a message to the
topic. -
#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object
Waits for an incoming message.
-
#subscribe(topic, qos: 0) ⇒ Object
Subscribe to the
topic.
Constructor Details
#initialize(host:, port:, client:, **kwargs) ⇒ Connection
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/ulms_client.rb', line 59 def initialize(host:, port:, client:, **kwargs) @client = client @mqtt = MQTT::Client.new @mqtt.host = host @mqtt.port = port @mqtt.client_id = client.to_s OPTIONS.each do |option| @mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil end end |
Instance Method Details
#connect ⇒ Object
Establish the connection.
73 74 75 76 |
# File 'lib/ulms_client.rb', line 73 def connect @mqtt.connect LOG.info("#{@client} connected") end |
#disconnect ⇒ Object
Disconnect from the broker.
79 80 81 82 |
# File 'lib/ulms_client.rb', line 79 def disconnect @mqtt.disconnect LOG.info("#{@client} disconnected") end |
#make_request(method, to:, payload:, properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object
A high-level method that makes a request and waits for the response on it.
Options:
- `to`: the destination service `Account` (required).
- `payload`: the publish payload (required).
- `properties`: additional MQTT properties hash.
- `qos`: Publish QoS. An integer 0..2.
- `timeout`: Timeout for the response awaiting.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/ulms_client.rb', line 154 def make_request(method, to:, payload:, properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) correlation_data = SecureRandom.hex properties.merge!({ type: 'request', method: method, correlation_data: correlation_data, response_topic: "agents/#{@client.agent}/api/v1/in/#{to}" }) topic = "agents/#{@client.agent}/api/v1/out/#{to}" publish(topic, payload: payload, properties: properties, qos: qos) receive(timeout) do |msg| msg.properties['type'] == 'response' && msg.properties['correlation_data'] == correlation_data end end |
#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object
Publish a message to the topic.
Options:
- `payload`: An object that will be dumped into JSON as the payload (required).
- `properties`: MQTT publish properties hash.
- `retain`: A boolean indicating whether the should be retained.
- `qos`: An integer 0..2 that sets the QoS.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/ulms_client.rb', line 91 def publish(topic, payload:, properties: {}, retain: false, qos: 0) envelope = { payload: JSON.dump(payload), properties: properties } @mqtt.publish(topic, JSON.dump(envelope), retain, qos) LOG.info " \#{@client.agent} published to \#{topic} (q\#{qos}, r\#{retain ? 1 : 0}):\n Payload: \#{JSON.pretty_generate(payload)}\n Properties: \#{JSON.pretty_generate(properties)}\n EOF\nend\n" |
#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object
Waits for an incoming message. If a block is given it passes the received message to the block. If the block returns falsey value it waits for the next one and so on. Returns the received message. Raises if timeout is over.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/ulms_client.rb', line 120 def receive(timeout=DEFAULT_TIMEOUT) Timeout::timeout(timeout, nil, "Timed out waiting for the message") do loop do topic, json = @mqtt.get envelope = JSON.load(json) payload = JSON.load(envelope['payload']) = IncomingMessage.new(topic, payload, envelope['properties']) LOG.info " \#{@client.agent} received a message from topic \#{topic}:\n Payload: \#{JSON.pretty_generate(message.payload)}\n Properties: \#{JSON.pretty_generate(message.properties)}\n EOF\n\n return message unless block_given?\n\n if yield(message)\n LOG.info \"The message matched the given predicate\"\n return message\n else\n LOG.info \"The message didn't match the given predicate. Waiting for the next one.\"\n end\n end\n end\nend\n" |
#subscribe(topic, qos: 0) ⇒ Object
Subscribe to the topic.
Options:
- `qos`: Subscriptions QoS. An interger 0..2.
110 111 112 113 |
# File 'lib/ulms_client.rb', line 110 def subscribe(topic, qos: 0) @mqtt.subscribe([topic, qos]) LOG.info("#{@client.agent} subscribed to #{topic} (q#{qos})") end |