Class: Aerospike::PartitionTokenizerNew

Inherits:
Object
  • Object
show all
Defined in:
lib/aerospike/cluster/partition_tokenizer_new.rb

Overview

:nodoc:

Instance Method Summary collapse

Constructor Details

#initialize(conn) ⇒ PartitionTokenizerNew

Returns a new instance of PartitionTokenizerNew.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/aerospike/cluster/partition_tokenizer_new.rb', line 27

def initialize(conn)
  # Use low-level info methods and parse byte array directly for maximum performance.
  # Send format:    replicas-master\n
  # Receive format: replicas-master\t<ns1>:<base 64 encoded bitmap>;<ns2>:<base 64 encoded bitmap>... \n
  info_map = Info.request(conn, REPLICAS_NAME)

  info = info_map[REPLICAS_NAME]

  @length = info ? info.length : 0

  if !info || @length == 0
    raise Aerospike::Exceptions::Connection.new("#{replicas_name} is empty")
  end

  @buffer = info
  @offset = 0

  self
end

Instance Method Details

#update_partition(nmap, node) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/aerospike/cluster/partition_tokenizer_new.rb', line 47

def update_partition(nmap, node)
  amap = nil

  beginning = @offset
  copied = false

  while @offset < @length
    if @buffer[@offset] == ':'
      # Parse namespace.
      namespace = @buffer[beginning...@offset].strip

      if namespace.length <= 0 || namespace.length >= 32
        response = get_truncated_response
        raise Aerospike::Exceptions::Parse.new(
          "Invalid partition namespace #{namespace}. Response=#{response}"
        )
      end

      @offset+=1
      beginning = @offset

      # Parse partition id.
      while @offset < @length
        b = @buffer[@offset]

        break if b == ';' || b == "\n"
        @offset+=1
      end

      if @offset == beginning
        response = get_truncated_response

        raise Aerospike::Exceptions::Parse.new(
          "Empty partition id for namespace #{namespace}. Response=#{response}"
        )
      end

      node_array = nmap[namespace]

      if !node_array
        if !copied
          # Make shallow copy of map.
          amap = {}
          nmap.each {|k, v| amap[k] = Atomic.new(v)}
          copied = true
        end

        node_array = Atomic.new(Array.new(Aerospike::Node::PARTITIONS))
        amap[namespace] = node_array
      end

      bit_map_length = @offset - beginning
      restore_buffer = Base64.strict_decode64(@buffer[beginning, bit_map_length])
      i = 0
      while i < Aerospike::Node::PARTITIONS
        if (restore_buffer[i>>3].ord & (0x80 >> (i & 7))) != 0
          # Logger.Info("Map: `" + namespace + "`," + strconv.Itoa(i) + "," + node.String)
          node_array.update{|v| v[i] = node; v}
        end
        i = i.succ
      end

      @offset+=1
      beginning = @offset
    else
      @offset+=1
    end
  end

  copied ? amap : nil
end