Class: ZooMQ::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/zoomq/client.rb,
lib/zoomq/client/connection.rb

Defined Under Namespace

Classes: Connection

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



19
20
21
22
23
24
25
26
27
28
# File 'lib/zoomq/client.rb', line 19

def initialize
  @requests = {}
  @request_id = AtomicInteger.new
  @zk = Zookeeper.new(service_name)
  @channel = ZMachine.connect(nil, ZMQ::ROUTER, Connection) do |connection|
    connection.client = self
  end
  watch
  refresh
end

Instance Attribute Details

#serversObject (readonly)

Returns the value of attribute servers.



17
18
19
# File 'lib/zoomq/client.rb', line 17

def servers
  @servers
end

Instance Method Details

#refreshObject



34
35
36
37
38
39
40
41
42
# File 'lib/zoomq/client.rb', line 34

def refresh
  @servers = @zk.servers
  @servers.each do |server|
    $log.debug("#{service_name}:connect", server: server)
    @channel.connect("tcp://#{server}")
    sleep(0.1)
  end
  @cycle = @servers.cycle
end

#request(obj) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/zoomq/client.rb', line 44

def request(obj)
  if @servers.empty?
    raise ServerUnavailableError.new("no servers are online, please try again later")
  end

  server = @cycle.next

  deferred = ZMachine::DefaultDeferrable.new
  deferred.callback { |result| yield result } if block_given?

  request_id = @request_id.increment_and_get
  msg = ZMsg.new_string_msg(obj.class.to_s, obj.to_s)
  msg.wrap(ZFrame.new(request_id.to_s))
  msg.wrap(ZFrame.new(server))
  $log.debug("#{service_name}:request", id: request_id, obj: obj.inspect)
  @channel.send_data(msg)

  @requests[request_id] = deferred
end

#response(request_id, obj) ⇒ Object



64
65
66
67
# File 'lib/zoomq/client.rb', line 64

def response(request_id, obj)
  $log.debug("#{service_name}:response", id: request_id, obj: obj.inspect)
  @requests[request_id].succeed(obj)
end

#watchObject



30
31
32
# File 'lib/zoomq/client.rb', line 30

def watch
  @zk.watch { refresh }
end