Class: Presto::Client::StatementClient

Inherits:
Object
  • Object
show all
Defined in:
lib/presto/client/statement_client.rb

Constant Summary collapse

JSON_OPTIONS =

Presto can return too deep nested JSON

{
    :max_nesting => false
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(faraday, query, options, next_uri = nil) ⇒ StatementClient

Returns a new instance of StatementClient.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/presto/client/statement_client.rb', line 29

def initialize(faraday, query, options, next_uri=nil)
  @faraday = faraday

  @options = options
  @query = query
  @closed = false
  @exception = nil
  @retry_timeout = options[:retry_timeout] || 120
  if model_version = @options[:model_version]
    @models = ModelVersions.const_get("V#{model_version.gsub(".", "_")}")
  else
    @models = Models
  end

  if next_uri
    response = faraday_get_with_retry(next_uri)
    @results = @models::QueryResults.decode(parse_body(response))
  else
    post_query_request!
  end
end

Instance Attribute Details

#exceptionObject (readonly)

Returns the value of attribute exception.



87
88
89
# File 'lib/presto/client/statement_client.rb', line 87

def exception
  @exception
end

#queryObject (readonly)

Returns the value of attribute query.



77
78
79
# File 'lib/presto/client/statement_client.rb', line 77

def query
  @query
end

Instance Method Details

#advanceObject



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/presto/client/statement_client.rb', line 109

def advance
  if closed? || !has_next?
    return false
  end
  uri = @results.next_uri

  response = faraday_get_with_retry(uri)
  @results = decode_model(uri, parse_body(response), @models::QueryResults)

  return true
end

#cancel_leaf_stageObject



188
189
190
191
192
193
194
195
196
# File 'lib/presto/client/statement_client.rb', line 188

def cancel_leaf_stage
  if uri = @results.next_uri
    response = @faraday.delete do |req|
      req.url uri
    end
    return response.status / 100 == 2
  end
  return false
end

#closeObject



198
199
200
201
202
203
204
205
206
207
# File 'lib/presto/client/statement_client.rb', line 198

def close
  return if @closed

  # cancel running statement
  # TODO make async reqeust and ignore response?
  cancel_leaf_stage

  @closed = true
  nil
end

#closed?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/presto/client/statement_client.rb', line 83

def closed?
  @closed
end

#current_resultsObject



101
102
103
# File 'lib/presto/client/statement_client.rb', line 101

def current_results
  @results
end

#debug?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/presto/client/statement_client.rb', line 79

def debug?
  !!@options[:debug]
end

#exception?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/presto/client/statement_client.rb', line 89

def exception?
  @exception
end

#faraday_get_with_retry(uri, &block) ⇒ Object

Raises:



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/presto/client/statement_client.rb', line 153

def faraday_get_with_retry(uri, &block)
  start = Time.now
  attempts = 0

  begin
    begin
      response = @faraday.get(uri)
    rescue Faraday::Error::TimeoutError, Faraday::Error::ConnectionFailed
      # temporally error to retry
      response = nil
    rescue => e
      @exception = e
      raise @exception
    end

    if response
      if response.status == 200 && !response.body.to_s.empty?
        return response
      end

      if response.status != 503  # retry only if 503 Service Unavailable
        # deterministic error
        @exception = PrestoHttpError.new(response.status, "Presto API error at #{uri} returned #{response.status}: #{response.body}")
        raise @exception
      end
    end

    attempts += 1
    sleep attempts * 0.1
  end while (Time.now - start) < @retry_timeout && !@closed

  @exception = PrestoHttpError.new(408, "Presto API error due to timeout")
  raise @exception
end

#has_next?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/presto/client/statement_client.rb', line 105

def has_next?
  !!@results.next_uri
end

#query_failed?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/presto/client/statement_client.rb', line 93

def query_failed?
  @results.error != nil
end

#query_infoObject



121
122
123
124
125
# File 'lib/presto/client/statement_client.rb', line 121

def query_info
  uri = "/v1/query/#{@results.id}"
  response = faraday_get_with_retry(uri)
  decode_model(uri, parse_body(response), @models::QueryInfo)
end

#query_succeeded?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/presto/client/statement_client.rb', line 97

def query_succeeded?
  @results.error == nil && !@exception && !@closed
end