Class: Appfuel::Service::RpcClient

Inherits:
Sneakers::Publisher
  • Object
show all
Defined in:
lib/appfuel/service/rpc_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#call_idObject

Returns the value of attribute call_id.



4
5
6
# File 'lib/appfuel/service/rpc_client.rb', line 4

def call_id
  @call_id
end

#channelObject (readonly)

Returns the value of attribute channel.



5
6
7
# File 'lib/appfuel/service/rpc_client.rb', line 5

def channel
  @channel
end

#conditionObject (readonly)

Returns the value of attribute condition.



5
6
7
# File 'lib/appfuel/service/rpc_client.rb', line 5

def condition
  @condition
end

#configObject (readonly)

Returns the value of attribute config.



5
6
7
# File 'lib/appfuel/service/rpc_client.rb', line 5

def config
  @config
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



5
6
7
# File 'lib/appfuel/service/rpc_client.rb', line 5

def exchange
  @exchange
end

#lockObject (readonly)

Returns the value of attribute lock.



5
6
7
# File 'lib/appfuel/service/rpc_client.rb', line 5

def lock
  @lock
end

#reply_queueObject (readonly)

Returns the value of attribute reply_queue.



5
6
7
# File 'lib/appfuel/service/rpc_client.rb', line 5

def reply_queue
  @reply_queue
end

#responseObject

Returns the value of attribute response.



4
5
6
# File 'lib/appfuel/service/rpc_client.rb', line 4

def response
  @response
end

Instance Method Details

#closeObject



30
31
32
# File 'lib/appfuel/service/rpc_client.rb', line 30

def close
  @bunny.close if connected?
end

#publish(to_queue, action_route, msg, headers = {}) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/appfuel/service/rpc_client.rb', line 7

def publish(to_queue, action_route, msg, headers = {})
  @mutex.synchronize do
    ensure_connection! unless connected?
  end

  self.call_id = SecureRandom.uuid
  msg          = msg.to_json
  @response    = nil
  params       = {
    routing_key:    to_queue,
    correlation_id: call_id,
    reply_to:       reply_queue.name,
    content_type:   'application/json',
    headers:        {action_route: action_route}.merge(headers)
  }

  exchange.publish(msg, params)
  lock.synchronize { condition.wait(lock) }

  result = JSON.parse(@response)
  Appfuel::ResponseHandler.new.create_response(result)
end