Class: Bosh::Director::NatsRpc

Inherits:
Object
  • Object
show all
Defined in:
lib/bosh/director/nats_rpc.rb

Overview

Remote procedure call client wrapping NATS

Instance Method Summary collapse

Constructor Details

#initialize(nats_uri) ⇒ NatsRpc

Returns a new instance of NatsRpc.



5
6
7
8
9
10
11
# File 'lib/bosh/director/nats_rpc.rb', line 5

def initialize(nats_uri)
  @nats_uri = nats_uri
  @logger = Config.logger
  @lock = Mutex.new
  @inbox_name = "director.#{Config.process_uuid}"
  @requests = {}
end

Instance Method Details

#cancel_request(request_id) ⇒ Object

Stops listening for a response



44
45
46
# File 'lib/bosh/director/nats_rpc.rb', line 44

def cancel_request(request_id)
  @lock.synchronize { @requests.delete(request_id) }
end

#generate_request_idObject



48
49
50
# File 'lib/bosh/director/nats_rpc.rb', line 48

def generate_request_id
  SecureRandom.uuid
end

#natsObject

Returns a lazily connected NATS client



14
15
16
# File 'lib/bosh/director/nats_rpc.rb', line 14

def nats
  @nats ||= connect
end

#send_message(client, payload) ⇒ Object

Publishes a payload (encoded as JSON) without expecting a response



19
20
21
22
23
24
25
# File 'lib/bosh/director/nats_rpc.rb', line 19

def send_message(client, payload)
  message = JSON.generate(payload)
  @logger.debug("SENT: #{client} #{message}")
  EM.schedule do
    nats.publish(client, message)
  end
end

#send_request(client, request, &callback) ⇒ Object

Sends a request (encoded as JSON) and listens for the response



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/bosh/director/nats_rpc.rb', line 28

def send_request(client, request, &callback)
  request_id = generate_request_id
  request["reply_to"] = "#{@inbox_name}.#{request_id}"
  @lock.synchronize do
    @requests[request_id] = callback
  end
  message = JSON.generate(request)
  @logger.debug("SENT: #{client} #{message}")
  EM.schedule do
    subscribe_inbox
    nats.publish(client, message)
  end
  request_id
end