Class: Bosh::Director::NatsRpc

Inherits:
Object
  • Object
show all
Defined in:
lib/bosh/director/nats_rpc.rb

Instance Method Summary collapse

Constructor Details

#initializeNatsRpc

Returns a new instance of NatsRpc.



6
7
8
9
10
11
12
13
# File 'lib/bosh/director/nats_rpc.rb', line 6

def initialize
  @nats = Config.nats
  @logger = Config.logger
  @lock = Mutex.new
  @inbox_name = "director.#{Config.process_uuid}"
  @requests = {}
  subscribe_inbox
end

Instance Method Details

#cancel_request(request_id) ⇒ Object



45
46
47
# File 'lib/bosh/director/nats_rpc.rb', line 45

def cancel_request(request_id)
  @lock.synchronize { @requests.delete(request_id) }
end

#generate_request_idObject



49
50
51
# File 'lib/bosh/director/nats_rpc.rb', line 49

def generate_request_id
  SecureRandom.uuid
end

#send_request(client, request, &block) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/bosh/director/nats_rpc.rb', line 31

def send_request(client, request, &block)
  request_id = generate_request_id
  request["reply_to"] = "#{@inbox_name}.#{request_id}"
  @lock.synchronize do
    @requests[request_id] = block
  end
  message = Yajl::Encoder.encode(request)
  @logger.debug("SENT: #{client} #{message}")
  EM.next_tick do
    @nats.publish(client, message)
  end
  request_id
end

#subscribe_inboxObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/bosh/director/nats_rpc.rb', line 15

def subscribe_inbox
  @nats.subscribe("#{@inbox_name}.>") do |message, _, subject|
    @logger.debug("RECEIVED: #{subject} #{message}")
    begin
      request_id = subject.split(".").last
      callback = @lock.synchronize { @requests.delete(request_id) }
      if callback
        message = Yajl::Parser.new.parse(message)
        callback.call(message)
      end
    rescue Exception => e
      @logger.warn(e.message)
    end
  end
end