Class: Trino::Client::StatementClient

Inherits:
Object
  • Object
show all
Defined in:
lib/trino/client/statement_client.rb

Constant Summary collapse

JSON_OPTIONS =

Trino can return too deep nested JSON

{
    :max_nesting => false
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(faraday, query, options, next_uri = nil) ⇒ StatementClient

Returns a new instance of StatementClient.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/trino/client/statement_client.rb', line 29

def initialize(faraday, query, options, next_uri=nil)
  @faraday = faraday

  @options = options
  @query = query
  @state = :running
  @retry_timeout = options[:retry_timeout] || 120
  if model_version = @options[:model_version]
    @models = ModelVersions.const_get("V#{model_version.gsub(".", "_")}")
  else
    @models = Models
  end

  @plan_timeout = options[:plan_timeout]
  @query_timeout = options[:query_timeout]

  if @plan_timeout || @query_timeout
    # this is set before the first call of faraday_get_with_retry so that
    # resuming StatementClient with next_uri is also under timeout control.
    @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end

  if next_uri
    response = faraday_get_with_retry(next_uri)
    @results_headers = response.headers
    @results = @models::QueryResults.decode(parse_body(response))
  else
    post_query_request!
  end
end

Instance Attribute Details

#queryObject (readonly)

Returns the value of attribute query.



87
88
89
# File 'lib/trino/client/statement_client.rb', line 87

def query
  @query
end

Instance Method Details

#advanceObject



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/trino/client/statement_client.rb', line 138

def advance
  return false unless running?

  unless has_next?
    @state = :finished
    return false
  end

  uri = @results.next_uri

  response = faraday_get_with_retry(uri)
  @results_headers = response.headers
  @results = decode_model(uri, parse_body(response), @models::QueryResults)

  raise_if_timeout!

  return true
end

#cancel_leaf_stageObject



253
254
255
256
257
258
259
# File 'lib/trino/client/statement_client.rb', line 253

def cancel_leaf_stage
  if uri = @results.partial_cancel_uri
    @faraday.delete do |req|
      req.url uri
    end
  end
end

#client_aborted?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/trino/client/statement_client.rb', line 97

def client_aborted?
  @state == :client_aborted
end

#client_error?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/trino/client/statement_client.rb', line 101

def client_error?
  @state == :client_error
end

#closeObject



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/trino/client/statement_client.rb', line 261

def close
  return unless running?

  @state = :client_aborted

  begin
    if uri = @results.next_uri
      @faraday.delete do |req|
        req.url uri
      end
    end
  rescue => e
  end

  nil
end

#current_resultsObject



117
118
119
# File 'lib/trino/client/statement_client.rb', line 117

def current_results
  @results
end

#current_results_headersObject



121
122
123
# File 'lib/trino/client/statement_client.rb', line 121

def current_results_headers
  @results_headers
end

#debug?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/trino/client/statement_client.rb', line 89

def debug?
  !!@options[:debug]
end

#exception!(e) ⇒ Object



133
134
135
136
# File 'lib/trino/client/statement_client.rb', line 133

def exception!(e)
  @state = :client_error
  raise e
end

#faraday_get_with_retry(uri, &block) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/trino/client/statement_client.rb', line 192

def faraday_get_with_retry(uri, &block)
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  attempts = 0

  begin
    begin
      response = @faraday.get(uri)
    rescue Faraday::TimeoutError, Faraday::ConnectionFailed
      # temporally error to retry
      response = nil
    rescue => e
      exception! e
    end

    if response
      if response.status == 200 && !response.body.to_s.empty?
        return response
      end

      if response.status != 503  # retry only if 503 Service Unavailable
        # deterministic error
        exception! TrinoHttpError.new(response.status, "Trino API error at #{uri} returned #{response.status}: #{response.body}")
      end
    end

    raise_if_timeout!

    attempts += 1
    sleep attempts * 0.1
  end while (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) < @retry_timeout && !client_aborted?

  exception! TrinoHttpError.new(408, "Trino API error due to timeout")
end

#finished?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/trino/client/statement_client.rb', line 105

def finished?
  @state == :finished
end

#has_next?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/trino/client/statement_client.rb', line 129

def has_next?
  !!@results.next_uri
end

#query_failed?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/trino/client/statement_client.rb', line 109

def query_failed?
  @results.error != nil
end

#query_idObject



125
126
127
# File 'lib/trino/client/statement_client.rb', line 125

def query_id
  @results.id
end

#query_infoObject



157
158
159
160
161
# File 'lib/trino/client/statement_client.rb', line 157

def query_info
  uri = "/v1/query/#{@results.id}"
  response = faraday_get_with_retry(uri)
  decode_model(uri, parse_body(response), @models::QueryInfo)
end

#query_succeeded?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/trino/client/statement_client.rb', line 113

def query_succeeded?
  @results.error == nil && finished?
end

#raise_if_timeout!Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/trino/client/statement_client.rb', line 226

def raise_if_timeout!
  if @started_at
    return if finished?

    elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at

    if @query_timeout && elapsed > @query_timeout
      raise_timeout_error!
    end

    if @plan_timeout && (@results == nil || @results.columns == nil) &&
        elapsed > @plan_timeout
      # @results is not set (even first faraday_get_with_retry isn't called yet) or
      # result from Trino doesn't include result schema. Query planning isn't done yet.
      raise_timeout_error!
    end
  end
end

#raise_timeout_error!Object



245
246
247
248
249
250
251
# File 'lib/trino/client/statement_client.rb', line 245

def raise_timeout_error!
  if query_id = @results && @results.id
    exception! TrinoQueryTimeoutError.new("Query #{query_id} timed out")
  else
    exception! TrinoQueryTimeoutError.new("Query timed out")
  end
end

#running?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/trino/client/statement_client.rb', line 93

def running?
  @state == :running
end