Class: Boltless::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/boltless/request.rb

Overview

A neo4j HTTP API request abstraction class, which consumes a single HTTP persistent connection for its whole runtime. This connection is strictly owned by a single request object. It is not safe to share it.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, access_mode: :write, database: Boltless.configuration.default_db, raw_results: false) ⇒ Request

Setup a new neo4j request instance with the given connection to use.

Parameters:

  • a ready to use persistent connection object

  • (defaults to: :write)

    the neo4j transaction mode (:read, or :write)

  • (defaults to: Boltless.configuration.default_db)

    the neo4j database to use

  • (defaults to: false)

    whenever to return the plain HTTP API JSON results (as plain Hash{Symbol => Mixed}/Array data), or not (then we return Array<Boltless::Result> structs



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/boltless/request.rb', line 51

def initialize(connection, access_mode: :write,
               database: Boltless.configuration.default_db,
               raw_results: false)
  # Check the given access mode
  @access_mode = mode = access_mode.to_s.upcase
  unless %(READ WRITE).include? mode
    raise ArgumentError, "Unknown access mode '#{access_mode}'. " \
                         "Use ':read' or ':write'."
  end

  @connection = connection
  @path_prefix = "/db/#{database}"
  @raw_results = raw_results
  @requests_done = 0

  # Make sure the upstream server is ready to rumble
  Boltless.wait_for_server!(connection)
end

Class Method Details

.statement_payload(cypher, **args) ⇒ Hash{Symbol => Mixed}

Convert a single Cypher query string and Hash arguments into a HTTP API/Cypher transaction API compatible form.

Parameters:

  • the Cypher statement to run

  • the additional Cypher parameters

Returns:

  • the compatible statement object



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/boltless/request.rb', line 27

def statement_payload(cypher, **args)
  { statement: cypher }.tap do |payload|
    # Enable the statement statistics if requested
    payload[:includeStats] = true if args.delete(:with_stats) == true

    # Enable the graphing output if request
    payload[:resultDataContents] = %w[row graph] \
      if args.delete(:result_as_graph) == true

    payload[:parameters] = args
  end
end

.statement_payloads(*statements) ⇒ Array<Hash{Symbol => Mixed}>

Convert a multiple Cypher queries and Hash arguments into multiple HTTP API/Cypher transaction API compatible hashes.

Parameters:

  • the Cypher statements to convert

Returns:

  • the compatible statement objects



15
16
17
18
19
# File 'lib/boltless/request.rb', line 15

def statement_payloads(*statements)
  statements.map do |(cypher, args)|
    statement_payload(cypher, **(args || {}))
  end
end

Instance Method Details

#begin_transactionInteger

Start a new transaction within our dedicated HTTP connection object at the neo4j server. When everything is fine, we return the transaction identifier from neo4j for further usage.

Returns:

  • the neo4j transaction identifier

Raises:

  • when we fail to start a new transaction



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/boltless/request.rb', line 101

def begin_transaction
  log_query(:begin, Request.statement_payload('BEGIN')) do
    handle_transport_errors do
      path = "#{@path_prefix}/tx"
      res = @connection.headers('Access-Mode' => @access_mode).post(path)

      # When neo4j sends a response code other than 2xx,
      # we stop further processing
      raise Errors::TransactionBeginError, res.to_s \
        unless res.status.success?

      # Try to extract the transaction identifier
      location = res.headers['Location'] || ''
      location.split("#{path}/").last.to_i.tap do |tx_id|
        # Make sure we flush this request from the persistent connection,
        # in order to allow further requests
        res.flush

        # When we failed to parse the transaction identifier,
        # we stop further processing
        raise Errors::TransactionBeginError, res.to_s \
          if tx_id.zero?
      end
    end
  end
end

#commit_transaction(tx_id, *statements) ⇒ Array<Hash{Symbol => Mixed}>

Commit an open transaction, by the given neo4j transaction identifier.

Parameters:

  • the neo4j transaction identifier

  • the Cypher statements to run, as transaction finalization

Returns:

  • the raw neo4j results

Raises:

  • when no open transaction was found by the given identifier

  • when there was an error while committing the transaction, we assume that any error causes a transaction rollback at the neo4j side



162
163
164
165
166
167
168
169
170
171
172
# File 'lib/boltless/request.rb', line 162

def commit_transaction(tx_id, *statements)
  log_query(tx_id, Request.statement_payload('COMMIT')) do
    handle_transaction(tx_id: tx_id) do |path|
      args = {}
      args[:body] = serialize_body(statements: statements) \
        if statements.any?

      @connection.post("#{path}/commit", **args)
    end
  end
end

#generate_log_str(tx_id, duration, *statements) ⇒ String

Generate a logging string for the given details, without actually printing it.

Parameters:

  • the neo4j transaction identifier

  • the duration (ms) of the query

  • the Cypher statements to run

Returns:

  • the assembled logging string



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/boltless/request.rb', line 348

def generate_log_str(tx_id, duration, *statements)
  dur = "(#{duration}ms)".colorize(color: :magenta, mode: :bold) \
    if duration

  tag = [
    '[',
    "tx:#{@access_mode.downcase}:#{tx_id || 'one-shot'}",
    tx_id ? " rq:#{@requests_done}" : '',
    ']'
  ].join.colorize(:white)

  prefix = ['Boltless'.colorize(:magenta), tag, dur].compact.join(' ')

  statements.map do |stmt|
    cypher = Boltless.resolve_cypher(
      stmt[:statement], **(stmt[:parameters] || {})
    ).lines.map(&:strip).join(' ')
    cypher = cypher.colorize(color: Boltless.cypher_logging_color(cypher),
                             mode: :bold)
    "#{prefix} #{cypher}"
  end.join("\n")
end

#handle_response_body(res, tx_id: nil) ⇒ Array<Hash{Symbol => Mixed}>

Handle a neo4j HTTP API response body in a generic way.

rubocop:disable Metrics/MethodLength – because of the result handling

(error, raw result, restructured result)

Parameters:

  • the raw HTTP response to handle

  • (defaults to: nil)

    the neo4j transaction identifier

Returns:

  • the raw neo4j results

Raises:

  • when there were at least one error in the response, so we assume the transaction was rolled back by neo4j



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/boltless/request.rb', line 230

def handle_response_body(res, tx_id: nil)
  # Parse the response body as a whole, which is returned by
  # the configured raw response handler
  body = FastJsonparser.parse(
    Boltless.configuration.raw_response_handler.call(res.to_s, res)
  )

  # When we hit some response errors, we handle them and
  # re-raise in a wrapped exception
  if (errors = body.fetch(:errors, [])).any?
    list = errors.map do |error|
      Errors::ResponseError.new(error[:message],
                                code: error[:code],
                                response: res)
    end
    raise Errors::TransactionRollbackError.new(
      "Transaction (#{tx_id}) rolled back due to errors (#{list.count})",
      errors: list,
      response: res
    )
  end

  # Otherwise return the results, either wrapped in a
  # lightweight struct or raw
  return body[:results] if @raw_results

  body.fetch(:results, []).map do |result|
    Boltless::Result.from(result)
  end
rescue FastJsonparser::ParseError => e
  # When we got something we could not parse, we tell so
  raise Errors::InvalidJsonError.new(e.message, response: res)
end

#handle_transaction(tx_id: nil) ⇒ Array<Hash{Symbol => Mixed}>

Handle a generic transaction interaction.

Parameters:

  • (defaults to: nil)

    the neo4j transaction identifier

Returns:

  • the raw neo4j results

Raises:

  • when no open transaction was found by the given identifier

  • when there was an error while rolling the transaction back



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/boltless/request.rb', line 200

def handle_transaction(tx_id: nil)
  handle_transport_errors do
    # Run the user given block, and pass the transaction path to it
    res = yield("#{@path_prefix}/tx/#{tx_id}")

    # When the transaction was not found, we tell so
    raise Errors::TransactionNotFoundError.new(res.to_s, response: res) \
      if res.code == 404

    # When the response was simply not successful, we tell so, too
    raise Errors::TransactionRollbackError.new(res.to_s, response: res) \
      unless res.status.success?

    # Handle the response body in a generic way
    handle_response_body(res, tx_id: tx_id)
  end
end

#handle_transport_errors { ... } ⇒ Mixed

Handle all the low-level http.rb gem errors transparently.

Yields:

  • the given block

Returns:

  • the result of the given block

Raises:

  • when a low-level error occurred



280
281
282
283
284
# File 'lib/boltless/request.rb', line 280

def handle_transport_errors
  yield
rescue HTTP::Error => e
  raise Errors::RequestError, e.message
end

#log_query(tx_id, *statements) { ... } ⇒ Mixed

Log the query details for the given statements, while benchmarking the given user block (which should contain the full request preparation, request performing and response parsing).

When the query_log_enabled configuration flag is set to false, we effectively do a no-op here, to keep things fast.

rubocop:disable Metrics/MethodLength – because of the configuration

handling

Parameters:

  • the neo4j transaction identifier

  • the Cypher statements to run

Yields:

  • the given user block

Returns:

  • the result of the user given block



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/boltless/request.rb', line 300

def log_query(tx_id, *statements)
  # When no query logging is enabled, we won't do it
  enabled = Boltless.configuration.query_log_enabled
  return yield unless enabled

  # Add a new request to the counter
  @requests_done += 1

  # When the +query_debug_log_enabled+ config flag is set, we produce a
  # logging output before the actual request is sent, in order to help
  # while debugging slow/never-ending Cypher statements
  if enabled == :debug
    Boltless.logger.debug do
      generate_log_str(tx_id == :begin ? 'tbd' : tx_id,
                       nil,
                       *statements)
    end
  end

  # Otherwise measure the runtime of the user given block,
  # and log the related statements
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC,
                                :float_millisecond)
  res = yield
  stop = Process.clock_gettime(Process::CLOCK_MONOTONIC,
                               :float_millisecond)

  # As a fallback to the +query_log_enabled+ config flag, we just log to
  # the debug level with a block, so it won't be executed when the logger
  # is not configured to print debug level
  Boltless.logger.debug do
    generate_log_str(tx_id == :begin ? res : tx_id,
                     (stop - start).truncate(1),
                     *statements)
  end

  # Return the result of the user given block
  res
end

#one_shot_transaction(*statements) ⇒ Array<Hash{Symbol => Mixed}>

Run one/multiple Cypher statements inside a one-shot transaction. A new transaction is opened, the statements are run and the transaction is committed in a single HTTP request for efficiency.

Parameters:

  • the Cypher statements to run

Returns:

  • the raw neo4j results

Raises:

  • when no open transaction was found by the given identifier

  • when there was an error while committing the transaction, we assume that any error causes a transaction rollback at the neo4j side



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/boltless/request.rb', line 82

def one_shot_transaction(*statements)
  # We do not allow to send a run-request without Cypher statements
  raise ArgumentError, 'No statements given' if statements.empty?

  log_query(nil, *statements) do
    handle_transaction(tx_id: 'commit') do |path|
      @connection.headers('Access-Mode' => @access_mode)
                 .post(path, body: serialize_body(statements: statements))
    end
  end
end

#rollback_transaction(tx_id) ⇒ Array<Hash{Symbol => Mixed}>

Rollback an open transaction, by the given neo4j transaction identifier.

Parameters:

  • the neo4j transaction identifier

Returns:

  • the raw neo4j results

Raises:

  • when no open transaction was found by the given identifier

  • when there was an error while rolling the transaction back



183
184
185
186
187
188
189
# File 'lib/boltless/request.rb', line 183

def rollback_transaction(tx_id)
  log_query(tx_id, Request.statement_payload('ROLLBACK')) do
    handle_transaction(tx_id: tx_id) do |path|
      @connection.delete(path)
    end
  end
end

#run_query(tx_id, *statements) ⇒ Array<Hash{Symbol => Mixed}>

Run one/multiple Cypher statements inside an open transaction.

Parameters:

  • the neo4j transaction identifier

  • the Cypher statements to run

Returns:

  • the raw neo4j results

Raises:

  • when no open transaction was found by the given identifier

  • when there was an error while committing the transaction, we assume that any error causes a transaction rollback at the neo4j side



139
140
141
142
143
144
145
146
147
148
# File 'lib/boltless/request.rb', line 139

def run_query(tx_id, *statements)
  # We do not allow to send a run-request without Cypher statements
  raise ArgumentError, 'No statements given' if statements.empty?

  log_query(tx_id, *statements) do
    handle_transaction(tx_id: tx_id) do |path|
      @connection.post(path, body: serialize_body(statements: statements))
    end
  end
end

#serialize_body(obj) ⇒ String

Serialize the given object to a JSON string.

Parameters:

  • the object to serialize

Returns:

  • the JSON string representation



269
270
271
272
# File 'lib/boltless/request.rb', line 269

def serialize_body(obj)
  obj = obj.deep_stringify_keys if obj.is_a? Hash
  Oj.dump(obj)
end