Class: Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/ulms_client.rb

Constant Summary collapse

OPTIONS =
[:password, :clean_session, :keep_alive]

Instance Method Summary collapse

Constructor Details

#initialize(host:, port:, mode:, agent:, **kwargs) ⇒ Connection

Returns a new instance of Connection.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/ulms_client.rb', line 46

def initialize(host:, port:, mode:, agent:, **kwargs)
  @agent = agent
  @mode = mode

  @mqtt = MQTT::Client.new
  @mqtt.host = host
  @mqtt.port = port
  @mqtt.username = "v2::#{mode}"
  @mqtt.client_id = agent.to_s

  OPTIONS.each do |option|
      @mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil
  end
end

Instance Method Details

#connectObject

Establish the connection.



62
63
64
65
# File 'lib/ulms_client.rb', line 62

def connect
  @mqtt.connect
  LOG.info("#{@agent} connected")
end

#disconnectObject

Disconnect from the broker.



68
69
70
71
# File 'lib/ulms_client.rb', line 68

def disconnect
  @mqtt.disconnect
  LOG.info("#{@agent} disconnected")
end

#make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT) ⇒ Object

A high-level method that makes a request and waits for the response on it.

Options:

- `to`: the destination service `Account` (required).
- `payload`: the publish message payload (required).
- `api_version`: service API version.
- `properties`: additional MQTT properties hash.
- `qos`: Publish QoS. An integer 0..2.
- `timeout`: Timeout for the response awaiting.


148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/ulms_client.rb', line 148

def make_request(method, to:, payload:, api_version: 'v1', properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT)
  correlation_data = SecureRandom.hex

  properties.merge!({
    type: 'request',
    method: method,
    correlation_data: correlation_data,
    response_topic: "agents/#{@agent}/api/#{api_version}/in/#{to}"
  })

  topic = "agents/#{@agent}/api/#{api_version}/out/#{to}"
  publish(topic, payload: payload, properties: properties, qos: qos)

  receive(timeout) do |msg|
    msg.properties['type'] == 'response' &&
      msg.properties['correlation_data'] == correlation_data
  end
end

#publish(topic, payload:, properties: {}, retain: false, qos: 0) ⇒ Object

Publish a message to the ‘topic`.

Options:

- `payload`: An object that will be dumped into JSON as the message payload (required).
- `properties`: MQTT publish properties hash.
- `retain`: A boolean indicating whether the messages should be retained.
- `qos`: An integer 0..2 that sets the QoS.


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/ulms_client.rb', line 80

def publish(topic, payload:, properties: {}, retain: false, qos: 0)
  if @mode == 'default' || !properties[:local_timestamp]
    properties = properties.merge(local_timestamp: DateTime.now.strftime('%Q'))
  end

  envelope = {
    payload: JSON.dump(payload),
    properties: properties
  }

  @mqtt.publish(topic, JSON.dump(envelope), retain, qos)

  LOG.info "    \#{@agent} published to \#{topic} (q\#{qos}, r\#{retain ? 1 : 0}):\n    Payload: \#{JSON.pretty_generate(payload)}\n    Properties: \#{JSON.pretty_generate(properties)}\n  EOF\nend\n"

#receive(timeout = DEFAULT_TIMEOUT) ⇒ Object

Waits for an incoming message. If a block is given it passes the received message to the block. If the block returns falsey value it waits for the next one and so on. Returns the received message. Raises if ‘timeout` is over.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/ulms_client.rb', line 113

def receive(timeout=DEFAULT_TIMEOUT)
  Timeout::timeout(timeout, nil, "Timed out waiting for the message") do
    loop do
      topic, json = @mqtt.get
      envelope = JSON.load(json)
      payload = JSON.load(envelope['payload'])
      message = IncomingMessage.new(topic, payload, envelope['properties'])

      LOG.info "        \#{@agent} received a message from topic \#{topic}:\n        Payload: \#{JSON.pretty_generate(message.payload)}\n        Properties: \#{JSON.pretty_generate(message.properties)}\n      EOF\n\n      return message unless block_given?\n\n      if yield(message)\n        LOG.info \"The message matched the given predicate\"\n        return message\n      else\n        LOG.info \"The message didn't match the given predicate. Waiting for the next one.\"\n      end\n    end\n  end\nend\n"

#subscribe(topic, qos: 0) ⇒ Object

Subscribe to the ‘topic`.

Options:

- `qos`: Subscriptions QoS. An interger 0..2.


103
104
105
106
# File 'lib/ulms_client.rb', line 103

def subscribe(topic, qos: 0)
  @mqtt.subscribe([topic, qos])
  LOG.info("#{@agent} subscribed to #{topic} (q#{qos})")
end