Class: Hatetepe::Client

Inherits:
Connection
  • Object
show all
Defined in:
lib/hatetepe/client.rb,
lib/hatetepe/client.rb,
lib/hatetepe/client/pipeline.rb,
lib/hatetepe/client/keep_alive.rb

Defined Under Namespace

Classes: KeepAlive, Pipeline

Instance Attribute Summary collapse

Attributes inherited from Connection

#processing_enabled

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Connection

#close_connection, #closed?, #closed_by_remote?, #closed_by_self?, #remote_address, #remote_port, #sockaddr

Constructor Details

#initialize(config) ⇒ Client

Returns a new instance of Client.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/hatetepe/client.rb', line 25

def initialize(config)
  @config = config
  @parser,  @builder = Hatetepe::Parser.new, Hatetepe::Builder.new
  
  @requests = []
  @pending_transmission, @pending_response = {}, {}
  
  @app = Rack::Builder.new.tap do |b|
    b.use KeepAlive
    b.use Pipeline
    b.run method(:send_request)
  end.to_app
  
  super
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



21
22
23
# File 'lib/hatetepe/client.rb', line 21

def app
  @app
end

#builderObject (readonly)

Returns the value of attribute builder.



22
23
24
# File 'lib/hatetepe/client.rb', line 22

def builder
  @builder
end

#configObject (readonly)

Returns the value of attribute config.



21
22
23
# File 'lib/hatetepe/client.rb', line 21

def config
  @config
end

#parserObject (readonly)

Returns the value of attribute parser.



22
23
24
# File 'lib/hatetepe/client.rb', line 22

def parser
  @parser
end

#pending_responseObject (readonly)

Returns the value of attribute pending_response.



23
24
25
# File 'lib/hatetepe/client.rb', line 23

def pending_response
  @pending_response
end

#pending_transmissionObject (readonly)

Returns the value of attribute pending_transmission.



23
24
25
# File 'lib/hatetepe/client.rb', line 23

def pending_transmission
  @pending_transmission
end

#requestsObject (readonly)

Returns the value of attribute requests.



23
24
25
# File 'lib/hatetepe/client.rb', line 23

def requests
  @requests
end

Class Method Details

.request(verb, uri, headers = {}, body = nil) ⇒ Object



201
202
203
204
205
206
207
208
209
# File 'lib/hatetepe/client.rb', line 201

def request(verb, uri, headers = {}, body = nil)
  uri = URI(uri)
  client = start(:host => uri.host, :port => uri.port)
  
  headers["X-Hatetepe-Single"] = true
  client.request(verb, uri.request_uri, headers, body).tap do |*|
    client.stop
  end
end

.start(config) ⇒ Object



197
198
199
# File 'lib/hatetepe/client.rb', line 197

def start(config)
  EM.connect config[:host], config[:port], self, config
end

Instance Method Details

#<<(request) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/hatetepe/client.rb', line 78

def <<(request)
  request.headers["Host"] ||= "#{config[:host]}:#{config[:port]}"

  request.connection = self
  unless processing_enabled?
    request.fail
    return
  end
  
  requests << request
  
  Fiber.new do
    begin
      pending_transmission[request.object_id] = EM::DefaultDeferrable.new
      
      app.call(request).tap do |response|
        request.response = response
        # XXX check for response.nil?
        status = (response && response.success?) ? :succeed : :fail
        requests.delete(request).send status, response
      end
    ensure
      pending_transmission.delete request.object_id
    end
  end.resume
end

#encode_body(headers, body) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/hatetepe/client.rb', line 165

def encode_body(headers, body)
  multipart, urlencoded = false, false
  if Hash === body
    query = lambda do |value|
      case value
      when Array
        value.each &query
      when Hash
        value.values.each &query
      when Rack::Multipart::UploadedFile
        multipart = true
      end
    end
    body.values.each &query
    urlencoded = !multipart
  end
  
  body = if multipart
    boundary = Rack::Multipart::MULTIPART_BOUNDARY
    headers["Content-Type"] = "multipart/form-data; boundary=#{boundary}"
    [Rack::Multipart.build_multipart(body)]
  elsif urlencoded
    headers["Content-Type"] = "application/x-www-form-urlencoded"
    [Rack::Utils.build_nested_query(body)]
  else
    body
  end
  
  [headers, body]
end

#post_initObject



41
42
43
44
45
46
47
48
# File 'lib/hatetepe/client.rb', line 41

def post_init
  parser.on_response << method(:receive_response)
  # XXX check if the connection is still present
  builder.on_write << method(:send_data)
  #builder.on_write {|data| p "client >> #{data}" }
  
  self.processing_enabled = true
end

#receive_data(data) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/hatetepe/client.rb', line 50

def receive_data(data)
  #p "client << #{data}"
  parser << data
rescue => e
  close_connection
  raise e
end

#receive_response(response) ⇒ Object



71
72
73
74
75
76
# File 'lib/hatetepe/client.rb', line 71

def receive_response(response)
  requests.find {|req| !req.response }.tap do |req|
    req.response = response
    pending_response[req.object_id].succeed response
  end
end

#request(verb, uri, headers = {}, body = nil, http_version = "1.1") ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/hatetepe/client.rb', line 105

def request(verb, uri, headers = {}, body = nil, http_version = "1.1")
  headers["User-Agent"] ||= "hatetepe/#{Hatetepe::VERSION}"
  
  body = wrap_body(body)
  headers, body = encode_body(headers.dup, body)
  
  request = Hatetepe::Request.new(verb, uri, headers, body, http_version)
  self << request

  # XXX shouldn't this happen in ::request ?
  self.processing_enabled = false
  
  EM::Synchrony.sync request
  
  request.response.body.close_write if request.verb == "HEAD"
  
  request.response
end

#send_request(request) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/hatetepe/client.rb', line 58

def send_request(request)
  id = request.object_id
  
  request.headers.delete "X-Hatetepe-Single"
  builder.request request.to_a
  pending_transmission[id].succeed
  
  pending_response[id] = EM::DefaultDeferrable.new
  EM::Synchrony.sync pending_response[id]
ensure
  pending_response.delete id
end

#stopObject



124
125
126
127
128
129
130
# File 'lib/hatetepe/client.rb', line 124

def stop
  unless requests.empty?
    last_response = EM::Synchrony.sync(requests.last)
    EM::Synchrony.sync last_response.body if last_response.body
  end
  close_connection
end

#unbindObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/hatetepe/client.rb', line 132

def unbind
  super
  
  EM.next_tick do
    requests.each do |req|
      # fail state triggers
      req.object_id.tap do |id|
        pending_transmission[id].fail if pending_transmission[id]
        pending_response[id].fail if pending_response[id]
      end
      # fail reponse body if the response has already been started
      if req.response
        req.response.body.tap {|b| b.close_write unless b.closed_write? }
      end
      # XXX FiberError: dead fiber called because req already succeeded
      #     or failed, see github.com/eventmachine/eventmachine/issues/287
      req.fail req.response
    end
  end
end

#wrap_body(body) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
# File 'lib/hatetepe/client.rb', line 153

def wrap_body(body)
  if body.respond_to? :each
    body
  elsif body.respond_to? :read
    [body.read]
  elsif body
    [body]
  else
    []
  end
end