Class: ZooMQ::Client
- Inherits:
-
Object
- Object
- ZooMQ::Client
- Defined in:
- lib/zoomq/client.rb,
lib/zoomq/client/connection.rb
Defined Under Namespace
Classes: Connection
Instance Attribute Summary collapse
-
#servers ⇒ Object
readonly
Returns the value of attribute servers.
Instance Method Summary collapse
-
#initialize ⇒ Client
constructor
A new instance of Client.
- #refresh ⇒ Object
- #request(obj) ⇒ Object
- #response(request_id, obj) ⇒ Object
- #watch ⇒ Object
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/zoomq/client.rb', line 19 def initialize @requests = {} @request_id = AtomicInteger.new @zk = Zookeeper.new(service_name) @channel = ZMachine.connect(nil, ZMQ::ROUTER, Connection) do |connection| connection.client = self end watch refresh end |
Instance Attribute Details
#servers ⇒ Object (readonly)
Returns the value of attribute servers.
17 18 19 |
# File 'lib/zoomq/client.rb', line 17 def servers @servers end |
Instance Method Details
#refresh ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/zoomq/client.rb', line 34 def refresh @servers = @zk.servers @servers.each do |server| $log.debug("#{service_name}:connect", server: server) @channel.connect("tcp://#{server}") sleep(0.1) end @cycle = @servers.cycle end |
#request(obj) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/zoomq/client.rb', line 44 def request(obj) if @servers.empty? raise ServerUnavailableError.new("no servers are online, please try again later") end server = @cycle.next deferred = ZMachine::DefaultDeferrable.new deferred.callback { |result| yield result } if block_given? request_id = @request_id.increment_and_get msg = ZMsg.new_string_msg(obj.class.to_s, obj.to_s) msg.wrap(ZFrame.new(request_id.to_s)) msg.wrap(ZFrame.new(server)) $log.debug("#{service_name}:request", id: request_id, obj: obj.inspect) @channel.send_data(msg) @requests[request_id] = deferred end |
#response(request_id, obj) ⇒ Object
64 65 66 67 |
# File 'lib/zoomq/client.rb', line 64 def response(request_id, obj) $log.debug("#{service_name}:response", id: request_id, obj: obj.inspect) @requests[request_id].succeed(obj) end |
#watch ⇒ Object
30 31 32 |
# File 'lib/zoomq/client.rb', line 30 def watch @zk.watch { refresh } end |