Class: Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/ulms_client.rb

Constant Summary collapse

OPTIONS =
[:password, :clean_session, :keep_alive]

Instance Method Summary collapse

Constructor Details

#initialize(host:, port:, mode:, agent:, **kwargs) ⇒ Connection

Returns a new instance of Connection.



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ulms_client.rb', line 45

def initialize(host:, port:, mode:, agent:, **kwargs)
  @agent = agent

  @mqtt = MQTT::Client.new
  @mqtt.host = host
  @mqtt.port = port
  @mqtt.username = "v2::#{mode}"
  @mqtt.client_id = agent.to_s

  OPTIONS.each do |option|
      @mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil
  end
end

Instance Method Details

#connectObject

Establish the connection.



60
61
62
63
# File 'lib/ulms_client.rb', line 60

def connect
  @mqtt.connect
  LOG.info("#{@agent} connected")
end

#disconnectObject

Disconnect from the broker.



66
67
68
69
# File 'lib/ulms_client.rb', line 66

def disconnect
  @mqtt.disconnect
  LOG.info("#{@agent} disconnected")
end

#make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object

A high-level method that makes a request and waits for the response on it.

Options:

- `to`: the destination service `Account` (required).
- `payload`: the publish message payload (required).
- `api_version`: service API version.
- `properties`: additional MQTT properties hash.
- `qos`: Publish QoS. An integer 0..2.
- `timeout`: Timeout for the response awaiting.


142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/ulms_client.rb', line 142

def make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT)
  correlation_data = SecureRandom.hex

  properties.merge!({
    type: 'request',
    method: method,
    correlation_data: correlation_data,
    response_topic: "agents/#{@agent}/api/#{api_version}/in/#{to}"
  })

  topic = "agents/#{@agent}/api/#{api_version}/out/#{to}"
  publish(topic, payload: payload, properties: properties, qos: qos)

  receive(timeout) do |msg|
    msg.properties['type'] == 'response' &&
      msg.properties['correlation_data'] == correlation_data
  end
end

#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object

Publish a message to the ‘topic`.

Options:

- `payload`: An object that will be dumped into JSON as the message payload (required).
- `properties`: MQTT publish properties hash.
- `retain`: A boolean indicating whether the messages should be retained.
- `qos`: An integer 0..2 that sets the QoS.


78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/ulms_client.rb', line 78

def publish(topic, payload:, properties: {}, retain: false, qos: 0)
  envelope = {
    payload: JSON.dump(payload),
    properties: properties
  }

  @mqtt.publish(topic, JSON.dump(envelope), retain, qos)

  LOG.info "    \#{@agent} published to \#{topic} (q\#{qos}, r\#{retain ? 1 : 0}):\n    Payload: \#{JSON.pretty_generate(payload)}\n    Properties: \#{JSON.pretty_generate(properties)}\n  EOF\nend\n"

#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object

Waits for an incoming message. If a block is given it passes the received message to the block. If the block returns falsey value it waits for the next one and so on. Returns the received message. Raises if ‘timeout` is over.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/ulms_client.rb', line 107

def receive(timeout=DEFAULT_TIMEOUT)
  Timeout::timeout(timeout, nil, "Timed out waiting for the message") do
    loop do
      topic, json = @mqtt.get
      envelope = JSON.load(json)
      payload = JSON.load(envelope['payload'])
      message = IncomingMessage.new(topic, payload, envelope['properties'])

      LOG.info "        \#{@agent} received a message from topic \#{topic}:\n        Payload: \#{JSON.pretty_generate(message.payload)}\n        Properties: \#{JSON.pretty_generate(message.properties)}\n      EOF\n\n      return message unless block_given?\n\n      if yield(message)\n        LOG.info \"The message matched the given predicate\"\n        return message\n      else\n        LOG.info \"The message didn't match the given predicate. Waiting for the next one.\"\n      end\n    end\n  end\nend\n"

#subscribe(topic, qos: 0) ⇒ Object

Subscribe to the ‘topic`.

Options:

- `qos`: Subscriptions QoS. An interger 0..2.


97
98
99
100
# File 'lib/ulms_client.rb', line 97

def subscribe(topic, qos: 0)
  @mqtt.subscribe([topic, qos])
  LOG.info("#{@agent} subscribed to #{topic} (q#{qos})")
end