Class: Serf::Client::Connection

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::IO, Celluloid::Logger, Commands, Logger
Defined in:
lib/serf/client/connection.rb

Constant Summary

Constants included from Commands

Serf::Client::Commands::COMMANDS

Instance Method Summary collapse

Methods included from Logger

log

Methods included from Commands

#command

Constructor Details

#initialize(address, port) ⇒ Connection

finalizer :shutdown



39
40
41
42
43
44
45
46
47
48
# File 'lib/serf/client/connection.rb', line 39

def initialize address, port
  info "connecting to socket #{address} on #{port}"
  connect address, port
  @io = IO.supervise(@socket, Actor.current).actors.first # avoid self
  @callbacks = Callbacks.supervise.actors.first
  @seqid = 0
  @messages = {}
  @requests = {}
  async.receive_response
end

Instance Method Details

#call(method, param = nil, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
120
# File 'lib/serf/client/connection.rb', line 111

def call(method, param=nil, &block)
  msgid = send_request(method, param)
  @callbacks.add msgid, block if block_given?

  future.wait_for_response msgid
  #::Celluloid::Future.new do
  #  until msg = @messages[msgid]; end
  #  msg
  #end
end

#handshakeObject



50
51
52
53
# File 'lib/serf/client/connection.rb', line 50

def handshake
  debug 'handshake'
  send_request(:handshake, Version: 1)
end

#process_response(header, &block) ⇒ Object

Process the response, yielding retrieves next message



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/serf/client/connection.rb', line 72

def process_response header, &block
  msgid = header["Seq"]

  h = @requests[msgid]
  raise "No request for #{header}" if not h

  cmd = h[:header]['Command']
  parts = command cmd
  debug "Processing #{cmd}"

  raise "No such command #{h}" unless parts

  # This is most likely the ACK
  if not h[:ack?]
    if parts.include? :body
      # ACK comes with a response body
      body = yield
      # Could probably clean up old things like events here, anything not a stream
    end
    h[:ack?] = true
  else
    # Alread ACKed -> should be a stream!
    raise "Cannot handle #{h}" unless ['monitor', 'stream', 'query'].include? cmd
    body = yield
  end

  resp = Response.new(header, body)
  received_response msgid, resp
  resp
end

#receive_responseObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/serf/client/connection.rb', line 55

def receive_response
  loop do
    # header
    header = receive
    debug "received: #{header}"

    error header unless header['Seq']
    if header["Error"].empty?
      # Keep the :receive contained here
      process_response(header) { r = receive; debug "received more: #{r}"; r }
    else
      error header["Error"]
    end
  end
end

#received_response(msgid, resp) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/serf/client/connection.rb', line 103

def received_response msgid, resp
  debug 'connection#received_response'
  # Tell the call back actor about our new response
  @callbacks.mailbox << resp
  # Let the future we created know about the response
  @messages[msgid] = resp
end

#send_request(method, param) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/serf/client/connection.rb', line 127

def send_request method, param
  debug 'send_request'

  msgid = seqid
  header = { "Command" => method.to_s, "Seq" => msgid }

  # Keep a reference for our response processing
  @requests[msgid] = { header: header, ack?: false }
  # Send to the writer
  @io.mailbox << [header, param]

  msgid
end

#seqidObject



141
142
143
144
145
# File 'lib/serf/client/connection.rb', line 141

def seqid
  v = @seqid
  @seqid += 1
  v
end

#wait_for_response(msgid) ⇒ Object



122
123
124
125
# File 'lib/serf/client/connection.rb', line 122

def wait_for_response msgid
  until msg = @messages[msgid]; sleep 0.1; end
  msg
end