Class: Connection
- Inherits:
-
Object
- Object
- Connection
- Defined in:
- lib/ulms_client.rb
Constant Summary collapse
- OPTIONS =
[:password, :clean_session, :keep_alive]
Instance Method Summary collapse
-
#connect ⇒ Object
Establish the connection.
-
#disconnect ⇒ Object
Disconnect from the broker.
-
#initialize(host:, port:, mode:, agent:, **kwargs) ⇒ Connection
constructor
A new instance of Connection.
-
#make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object
A high-level method that makes a request and waits for the response on it.
-
#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object
Publish a message to the ‘topic`.
-
#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object
Waits for an incoming message.
-
#subscribe(topic, qos: 0) ⇒ Object
Subscribe to the ‘topic`.
Constructor Details
#initialize(host:, port:, mode:, agent:, **kwargs) ⇒ Connection
Returns a new instance of Connection.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/ulms_client.rb', line 46 def initialize(host:, port:, mode:, agent:, **kwargs) @agent = agent @mode = mode @mqtt = MQTT::Client.new @mqtt.host = host @mqtt.port = port @mqtt.username = "v2::#{mode}" @mqtt.client_id = agent.to_s OPTIONS.each do |option| @mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil end end |
Instance Method Details
#connect ⇒ Object
Establish the connection.
62 63 64 65 |
# File 'lib/ulms_client.rb', line 62 def connect @mqtt.connect LOG.info("#{@agent} connected") end |
#disconnect ⇒ Object
Disconnect from the broker.
68 69 70 71 |
# File 'lib/ulms_client.rb', line 68 def disconnect @mqtt.disconnect LOG.info("#{@agent} disconnected") end |
#make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object
A high-level method that makes a request and waits for the response on it.
Options:
- `to`: the destination service `Account` (required).
- `payload`: the publish payload (required).
- `api_version`: service API version.
- `properties`: additional MQTT properties hash.
- `qos`: Publish QoS. An integer 0..2.
- `timeout`: Timeout for the response awaiting.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/ulms_client.rb', line 148 def make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) correlation_data = SecureRandom.hex properties.merge!({ type: 'request', method: method, correlation_data: correlation_data, response_topic: "agents/#{@agent}/api/#{api_version}/in/#{to}" }) topic = "agents/#{@agent}/api/#{api_version}/out/#{to}" publish(topic, payload: payload, properties: properties, qos: qos) receive(timeout) do |msg| msg.properties['type'] == 'response' && msg.properties['correlation_data'] == correlation_data end end |
#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object
Publish a message to the ‘topic`.
Options:
- `payload`: An object that will be dumped into JSON as the payload (required).
- `properties`: MQTT publish properties hash.
- `retain`: A boolean indicating whether the should be retained.
- `qos`: An integer 0..2 that sets the QoS.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/ulms_client.rb', line 80 def publish(topic, payload:, properties: {}, retain: false, qos: 0) if @mode == 'default' || !properties[:local_timestamp] properties = properties.merge(local_timestamp: DateTime.now.strftime('%Q')) end envelope = { payload: JSON.dump(payload), properties: properties } @mqtt.publish(topic, JSON.dump(envelope), retain, qos) LOG.info " \#{@agent} published to \#{topic} (q\#{qos}, r\#{retain ? 1 : 0}):\n Payload: \#{JSON.pretty_generate(payload)}\n Properties: \#{JSON.pretty_generate(properties)}\n EOF\nend\n" |
#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object
Waits for an incoming message. If a block is given it passes the received message to the block. If the block returns falsey value it waits for the next one and so on. Returns the received message. Raises if ‘timeout` is over.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/ulms_client.rb', line 113 def receive(timeout=DEFAULT_TIMEOUT) Timeout::timeout(timeout, nil, "Timed out waiting for the message") do loop do topic, json = @mqtt.get envelope = JSON.load(json) payload = JSON.load(envelope['payload']) = IncomingMessage.new(topic, payload, envelope['properties']) LOG.info " \#{@agent} received a message from topic \#{topic}:\n Payload: \#{JSON.pretty_generate(message.payload)}\n Properties: \#{JSON.pretty_generate(message.properties)}\n EOF\n\n return message unless block_given?\n\n if yield(message)\n LOG.info \"The message matched the given predicate\"\n return message\n else\n LOG.info \"The message didn't match the given predicate. Waiting for the next one.\"\n end\n end\n end\nend\n" |
#subscribe(topic, qos: 0) ⇒ Object
Subscribe to the ‘topic`.
Options:
- `qos`: Subscriptions QoS. An interger 0..2.
103 104 105 106 |
# File 'lib/ulms_client.rb', line 103 def subscribe(topic, qos: 0) @mqtt.subscribe([topic, qos]) LOG.info("#{@agent} subscribed to #{topic} (q#{qos})") end |