Class: Aerospike::StreamCommand

Inherits:
BatchCommand show all
Defined in:
lib/aerospike/query/stream_command.rb

Overview

:nodoc:

Direct Known Subclasses

QueryCommand, ScanCommand

Instance Method Summary collapse

Methods inherited from BatchCommand

#initialize, #parse_key, #parse_record, #parse_result, #read_bytes, #stop, #valid?

Methods inherited from Command

#execute, #initialize, #set_batch_exists, #set_batch_get, #set_delete, #set_exists, #set_operate, #set_read, #set_read_for_key_only, #set_read_header, #set_scan, #set_touch, #set_udf, #set_write

Constructor Details

This class inherits a constructor from Aerospike::BatchCommand

Instance Method Details

#parse_record_results(receive_size) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/aerospike/query/stream_command.rb', line 29

def parse_record_results(receive_size)
  @data_offset = 0

  while @data_offset < receive_size
    read_bytes(MSG_REMAINING_HEADER_SIZE)
    result_code = @data_buffer.read(5).ord & 0xFF

    # The only valid server return codes are "ok" and "not found".
    # If other return codes are received, then abort the batch.
    if result_code != 0
      # if there is no recordset defined, it means this is an Execute UDF On Query command
      # return successfully
      if (@recordset == nil) && (result_code == Aerospike::ResultCode::KEY_NOT_FOUND_ERROR)
        return nil
      end
      raise Aerospike::Exceptions::Aerospike.new(result_code)
    end

    info3 = @data_buffer.read(3).ord

    # If cmd is the end marker of the response, do not proceed further
    return false if (info3 & INFO3_LAST) == INFO3_LAST

    generation = @data_buffer.read_int32(6)
    expiration = @data_buffer.read_int32(10)
    field_count = @data_buffer.read_int16(18)
    op_count = @data_buffer.read_int16(20)
    key = parse_key(field_count)

    if result_code == 0
      if @recordset.active?
        @recordset.records.enq(parse_record(key, op_count, generation, expiration))
      else
        expn = @recordset.is_scan? ? SCAN_TERMINATED_EXCEPTION : QUERY_TERMINATED_EXCEPTION
        raise expn
      end
    end
  end # while

  true
end