Class: YugabyteYSQL::LoadBalanceService

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/load_balance_service.rb

Defined Under Namespace

Classes: CloudPlacement, LBProperties, Node

Constant Summary collapse

@@mutex =
Concurrent::ReentrantReadWriteLock.new
@@last_refresh_time =
-1
@@control_connection =
nil
@@cluster_info =
{ }
@@useHostColumn =
nil

Class Method Summary collapse

Class Method Details

.connect_to_lb_hosts(lb_props, iopts) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/pg/load_balance_service.rb', line 42

def self.connect_to_lb_hosts(lb_props, iopts)
  refresh_done = false
  @@mutex.acquire_write_lock
  begin
    if  lb_props.refresh_interval
      while !refresh_done
        if @@control_connection == nil
          begin
            @@control_connection = create_control_connection(iopts)
          rescue
            return nil
          end
        end
        begin
          refresh_yb_servers(lb_props.failed_host_reconnect_delay, @@control_connection)
          refresh_done = true
        rescue => err
          if iopts[:host] == @@control_connection.host
            if @@cluster_info[iopts[:host]]
              @@cluster_info[iopts[:host]].is_down = true
              @@cluster_info[iopts[:host]].down_since = Time.now.to_i
            end

            new_list = @@cluster_info.select {|k, v| !v.is_down }
            if new_list.length > 0
              h = new_list.keys.first
              iopts[:port] = new_list[h].port
              iopts[:host] = h
            else
              return nil
              # raise(PG::Error, "Unable to create a control connection")
            end
          end
          @@control_connection = create_control_connection(iopts)
        end
      end
    end
  ensure
    @@mutex.release_write_lock
  end
  success = false
  new_request = true
  placement_index = 1
  until success
    @@mutex.acquire_write_lock
    begin
      host_port = get_least_loaded_server(lb_props.placements_info, lb_props.fallback_to_tk_only, new_request, placement_index)
      new_request = false
    ensure
      @@mutex.release_write_lock
    end
    unless host_port
      break
    end
    lb_host = host_port[0]
    lb_port = host_port[1]
    placement_index = host_port[2]
    if lb_host.empty?
      break
    end
    # modify iopts args
    begin
      iopts[:host] = lb_host
      iopts[:port] = lb_port
      # iopts = resolve_hosts(iopts)
      connection = YugabyteYSQL.connect(iopts)
      success = true
    rescue => e
      @@mutex.acquire_write_lock
      begin
        @@cluster_info[lb_host].is_down = true
        @@cluster_info[lb_host].down_since = Time.now.to_i
        @@cluster_info[lb_host].count -= 1
        if @@cluster_info[lb_host].count < 0
          @@cluster_info[lb_host].count = 0
        end
      ensure
        @@mutex.release_write_lock
      end
    end
  end
  connection
end

.create_control_connection(iopts) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/pg/load_balance_service.rb', line 126

def self.create_control_connection(iopts)
  conn = nil
  success = false
  # Iterate until control connection is successful or all nodes are tried
  until success
    begin
      conn = YugabyteYSQL.connect(iopts)
      success = true
    rescue => e
      if @@cluster_info[iopts[:host]]
        @@cluster_info[iopts[:host]].is_down = true
        @@cluster_info[iopts[:host]].down_since = Time.now.to_i
      end

      new_list = @@cluster_info.select {|k, v| !v.is_down }
      if new_list.length > 0
        h = new_list.keys.first
        iopts[:port] = new_list[h].port
        iopts[:host] = h
      else
        raise(YugabyteYSQL::Error, "Unable to create a control connection")
      end
    end
  end
  conn
end

.decrement_connection_count(host) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pg/load_balance_service.rb', line 24

def self.decrement_connection_count(host)
  @@mutex.acquire_write_lock
  begin
    info = @@cluster_info[host]
    unless info.nil?
      info.count -= 1
      if info.count < 0
        # Can go negative if we are here because of a connection that was created in a non-LB fashion
        info.count = 0
      end
      return true
    end
  ensure
    @@mutex.release_write_lock
  end
  false
end

.get_least_loaded_server(allowed_placements, fallback_to_tk_only, new_request, placement_index) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/pg/load_balance_service.rb', line 199

def self.get_least_loaded_server(allowed_placements, fallback_to_tk_only, new_request, placement_index)
  current_index = 1
  selected = Array.new
  unless allowed_placements.nil? # topology-aware
    eligible_hosts = Array.new
    (placement_index..10).each { |idx|
      current_index = idx
      selected.clear
      min_connections = 1000000 # Using some really high value
      @@cluster_info.each do |host, node_info|
        unless node_info.is_down
          unless allowed_placements[idx].nil?
            allowed_placements[idx].each do |cp|
              if cp[0] == node_info.cloud && cp[1] == node_info.region && (cp[2] == node_info.zone || cp[2] == "*")
                eligible_hosts << host
                if node_info.count < min_connections
                  min_connections = node_info.count
                  selected.clear
                  selected.push(host)
                elsif node_info.count == min_connections
                  selected.push(host)
                end
                break # Move to the next node
              end
            end
          end
        end
      end
      if selected.length > 0
        break
      end
    }
  end

  if allowed_placements.nil? || (selected.empty? && !fallback_to_tk_only) # cluster-aware || fallback_to_tk_only = false
    unless allowed_placements.nil?
    end
    min_connections = 1000000 # Using some really high value
    selected = Array.new
    @@cluster_info.each do |host, node_info|
      unless node_info.is_down
        if node_info.count < min_connections
          min_connections = node_info.count
          selected.clear
          selected.push(host)
        elsif node_info.count == min_connections
          selected.push(host)
        end
      end
    end
  end

  if selected.empty?
    nil
  else
    index = rand(selected.size)
    selected_node = selected[index]
    @@cluster_info[selected_node].count += 1
    if !@@useHostColumn.nil? && !@@useHostColumn
      selected_node = @@cluster_info[selected_node].public_ip
    end
    Array[selected_node, @@cluster_info[selected_node].port, current_index]
  end
end

.get_load(host) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/pg/load_balance_service.rb', line 16

def self.get_load(host)
  if @@cluster_info[host]
    @@cluster_info[host].count
  else
    0
  end
end

.metadata_needs_refresh(refresh_interval) ⇒ Object



379
380
381
382
383
384
385
# File 'lib/pg/load_balance_service.rb', line 379

def self.(refresh_interval)
  if Time.now.to_i - @@last_refresh_time >= refresh_interval # || force_refresh == true
    true
  else
    false
  end
end

.parse_connect_lb_args(hash_arg) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/pg/load_balance_service.rb', line 304

def self.parse_connect_lb_args(hash_arg)
  lb = hash_arg.delete(:load_balance)
  tk = hash_arg.delete(:topology_keys)
  ri = hash_arg.delete(:yb_servers_refresh_interval)
  ttl = hash_arg.delete(:failed_host_reconnect_delay_secs)
  fb = hash_arg.delete(:fallback_to_topology_keys_only)

  if lb && lb.to_s.downcase == "true"
    lb_properties = LBProperties.new(nil, 300, false, 5)
    if tk
      lb_properties.placements_info = Hash.new
      tk_parts = tk.split(',', -1)
      tk_parts.each {
        |single_tk|
        if single_tk.empty?
          raise ArgumentError, "Empty value for topology_keys specified"
        end
        single_tk_parts = single_tk.split(':', -1)
        if single_tk_parts.length > 2
          raise ArgumentError, "Invalid preference value '#{single_tk_parts}' specified for topology_keys: " + tk
        end
        cp = single_tk_parts[0].split('.', -1)
        if cp.length != 3
          raise ArgumentError, "Invalid cloud placement value '#{single_tk_parts[0]}' specified for topology_keys: " + tk
        end
        preference_value = 1
        if single_tk_parts.length == 2
          preference = single_tk_parts[1]
          if preference == ""
            raise ArgumentError, "No preference value specified for topology_keys: " + tk
          end
          begin
            preference_value = Integer(preference).to_i
          rescue
            raise ArgumentError, "Invalid preference value '#{preference}' for topology_keys: " + tk
          ensure
            if preference_value < 1 || preference_value > 10
              raise ArgumentError, "Invalid preference value '#{preference_value}' for topology_keys: " + tk
            end
          end
        end
        unless lb_properties.placements_info[preference_value]
          lb_properties.placements_info[preference_value] = Set.new
        end
        lb_properties.placements_info[preference_value] << CloudPlacement.new(cp[0], cp[1], cp[2])
      }
    end

    begin
      lb_properties.refresh_interval = Integer(ri).to_i if ri
    rescue ArgumentError => ae
      lb_properties.refresh_interval = 300
    ensure
      if lb_properties.refresh_interval < 0 || lb_properties.refresh_interval > 600
        lb_properties.refresh_interval = 300
      end
    end

    begin
      lb_properties.failed_host_reconnect_delay = Integer(ttl).to_i if ttl
    rescue ArgumentError
    ensure
      if lb_properties.failed_host_reconnect_delay < 0 || lb_properties.failed_host_reconnect_delay > 60
        lb_properties.failed_host_reconnect_delay = 5
      end
    end

    lb_properties.fallback_to_tk_only = fb.to_s.downcase == "true" if fb

  else
    lb_properties = nil
  end
  lb_properties
end

.parse_lb_args_from_url(conn_string) ⇒ Object



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/pg/load_balance_service.rb', line 264

def self.parse_lb_args_from_url(conn_string)
  string_parts = conn_string.split('?', -1)
  if string_parts.length != 2
    return conn_string, nil
  else
    base_string = string_parts[0] + "?"
    lb_props = Hash.new
    tokens = string_parts[1].split('&', -1)
    tokens.each {
      |token|
      unless token.empty?
        k, v = token.split('=', 2)
        case k
        when "load_balance"
          lb_props[:load_balance] = v
        when "topology_keys"
          lb_props[:topology_keys] = v
        when "yb_servers_refresh_interval"
          lb_props[:yb_servers_refresh_interval] = v
        when "failed_host_reconnect_delay_secs"
          lb_props[:failed_host_reconnect_delay_secs] = v
        when "fallback_to_topology_keys_only"
          lb_props[:fallback_to_topology_keys_only] = v
        else
          # not LB-specific
          base_string << "#{k}=#{v}&"
        end
      end
    }

    base_string = base_string.chop if base_string[-1] == "&"
    base_string = base_string.chop if base_string[-1] == "?"
    if not lb_props.empty? and lb_props[:load_balance].to_s.downcase == "true"
      return base_string, parse_connect_lb_args(lb_props)
    else
      return base_string, nil
    end
  end
end

.refresh_yb_servers(failed_host_reconnect_delay_secs, conn) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/pg/load_balance_service.rb', line 153

def self.refresh_yb_servers(failed_host_reconnect_delay_secs, conn)
  rs = conn.exec("select * from yb_servers()")
  found_public_ip = false
  rs.each do |row|
    # Take the first address of resolved host addresses
    host = resolve_host(row['host'])[0][0] # 2D array
    port = row['port']
    cloud = row['cloud']
    region = row['region']
    zone = row['zone']
    public_ip = row['public_ip']
    public_ip = resolve_host(public_ip)[0][0] if public_ip
    if not public_ip.nil? and not public_ip.empty?
      found_public_ip = true
    end

    # set useHostColumn field
    if @@useHostColumn.nil?
      if host.eql? conn.host
        @@useHostColumn = true
      end
      if !public_ip.nil? && (public_ip.eql? conn.host)
        @@useHostColumn = false
      end
    end
    old = @@cluster_info[host]
    if old
      if old.is_down
        if Time.now.to_i - old.down_since > failed_host_reconnect_delay_secs
          old.is_down = false
        end
        @@cluster_info[host] = old
      end
    else
      node = Node.new(host, port, cloud, region, zone, public_ip, 0, false, 0)
      @@cluster_info[host] = node
    end
  end
  if @@useHostColumn.nil?
    if found_public_ip
      @@useHostColumn = false
    end
  end
  @@last_refresh_time = Time.now.to_i
end

.resolve_host(mhost) ⇒ Object



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'lib/pg/load_balance_service.rb', line 387

def self.resolve_host(mhost)
  if YugabyteYSQL::Connection.host_is_named_pipe?(mhost)
    # No hostname to resolve (UnixSocket)
    hostaddrs = [nil]
  else
    if Fiber.respond_to?(:scheduler) &&
      Fiber.scheduler &&
      RUBY_VERSION < '3.1.'

      # Use a second thread to avoid blocking of the scheduler.
      # `TCPSocket.gethostbyname` isn't fiber aware before ruby-3.1.
      hostaddrs = Thread.new { Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value
    else
      hostaddrs = Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue ['']
    end
  end
  hostaddrs.map { |hostaddr| [hostaddr, mhost] }
end