Class: Murakumo::Cloud

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/srv/murakumo_cloud.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Cloud

Returns a new instance of Cloud.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/srv/murakumo_cloud.rb', line 21

def initialize(options)
  # オプションはインスタンス変数に保存
  @options = options

  # リソースレコードからホストのアドレスとデータを取り出す
  host_data = options[:host]
  @address = host_data.shift
  host_data.concat [ORIGIN, 0, ACTIVE]
  alias_datas = options[:aliases].map {|r|
    # ヘルスチェックの初期値に合わせて健康状態を設定
    health_check_conf = (@options[:health_check] || {}).find {|k, v| k =~ /\A#{r[0]}\Z/i } || []
    init_status = health_check_conf.fetch(1, {}).fetch('init-status', ACTIVE)
    r + [init_status]
  }
  @logger = options[:logger]

  # 名前は小文字に変換
  @initial_datas = datas = ([host_data] + alias_datas).map do |i|
    name, ttl, priority, weight, activity = i
    name = name.downcase
    [name, ttl, priority, weight, activity]
  end

  # データベースを作成してレコードを更新
  create_database
  update(@address, datas)

  # updateの後にホスト名をセットすること
  @hostname = host_data.first

  # ヘルスチェック
  @health_checkers = {}

  if options[:health_check]
    health_check = options[:health_check]

    if health_check.kind_of?(Hash)
      health_check.each do |name, conf|
        name = name.downcase

        if options[:notification]
          conf = conf.merge(:notification => options[:notification])
        end

        if datas.any? {|i| i[0] == name }
          checker = HealthChecker.new(@address, name, self, @logger, conf)
          @health_checkers[name] = checker
          # ヘルスチェックはまだ起動しない
        else
          # ホスト名になかったら警告
          @logger.warn("host for a health check is not found: #{name}")
        end
      end
    else
      @logger.warn('configuration of a health check is not right')
    end
  end

  # アクティビティチェック
  @activity_checkers = {}

  if options[:activity_check]
    activity_check = options[:activity_check]

    if activity_check.kind_of?(Hash)
      activity_check.each do |name, conf|
        name = name.downcase

        if options[:notification]
          conf = conf.merge(:notification => options[:notification])
        end

        if datas.any? {|i| i[0] == name }
          checker = ActivityChecker.new(@address, name, self, @logger, conf)
          @activity_checkers[name] = checker
          # アクティビティチェックはまだ起動しない
        else
          # ホスト名になかったら警告
          @logger.warn("host for a activity check is not found: #{name}")
        end
      end
    else
      @logger.warn('configuration of a activity check is not right')
    end
  end

  # キャッシュ
  @cache = {} if options[:enable_cache]

  # バランサー
  @balancer = Balancer.new(@options[:balancing], @address, @db, @logger)
end

Instance Attribute Details

#addressObject (readonly)

Returns the value of attribute address.



16
17
18
# File 'lib/srv/murakumo_cloud.rb', line 16

def address
  @address
end

#dbObject (readonly)

Returns the value of attribute db.



18
19
20
# File 'lib/srv/murakumo_cloud.rb', line 18

def db
  @db
end

#gossipObject (readonly)

Returns the value of attribute gossip.



17
18
19
# File 'lib/srv/murakumo_cloud.rb', line 17

def gossip
  @gossip
end

#hostnameObject (readonly)

Returns the value of attribute hostname.



19
20
21
# File 'lib/srv/murakumo_cloud.rb', line 19

def hostname
  @hostname
end

Instance Method Details

#add_nodes(nodes) ⇒ Object



379
380
381
382
383
384
385
386
387
# File 'lib/srv/murakumo_cloud.rb', line 379

def add_nodes(nodes)
  errmsg = nil

  nodes.each do |i|
    @gossip.add_node(i)
  end

  return [!errmsg, errmsg]
end

#add_or_rplace_records(records) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/srv/murakumo_cloud.rb', line 300

def add_or_rplace_records(records)
  errmsg = nil

  # 名前は小文字に変換
  records = records.map do |i|
    name, ttl, priority, weight = i
    name = name.downcase
    [name, ttl, priority, weight]
  end

  @gossip.transaction do

    # 既存のホスト名は削除
    @gossip.data.reject! do |d|
      if records.any? {|r| r[0] == d[0] }
        # オリジンのPriorityは変更不可
        if d[2] == ORIGIN
          records.each {|r| r[2] = ORIGIN if r[0] == d[0] }
        end

        true
      end
    end

    # データを更新
    records = records.map {|r| r + [ACTIVE] }
    @gossip.data.concat(records)

  end # transaction

  # データベースを更新
  update(@address, records, true)

  # ヘルスチェックがあれば開始
  records.map {|i| i.first }.each do |name|
    checker = @health_checkers[name]

    if checker and not checker.alive?
      checker.start
    end
  end

  return [!errmsg, errmsg]
end

#address_exist?(name, resource_class) ⇒ Boolean

Search of records

Returns:

  • (Boolean)


484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/srv/murakumo_cloud.rb', line 484

def address_exist?(name, resource_class)
  # includes、excludesのチェック
  if @options[:name_excludes] and @options[:name_excludes].any? {|r| r =~ name }
    return false
  end

  if @options[:name_includes] and not @options[:name_includes].any? {|r| r =~ name }
    return false
  end

  # 名前は小文字に変換
  name = name.downcase

  # ドメインが指定されていたら削除
  name.sub!(/\.#{Regexp.escape(@options[:domain])}\Z/i, '') if @options[:domain]

  if @cache.nil?
    # キャッシュを設定していないときはいつもの処理
    @address_records = @db.execute(<<-EOS, name, ACTIVE) # シングルスレッドェ…
      SELECT ip_address, ttl, priority, weight FROM records
      WHERE name = ? AND activity = ?
    EOS
  else
    # キャッシュを設定しているとき
    # キャッシュを検索
    @address_records, cache_time = @cache[name]
    now = Time.now

    # キャッシュが見つからなかった・期限が切れていた場合
    if @address_records.nil? or now > cache_time
      # 既存のキャッシュは削除
      @cache.delete(name)

      # 普通に検索
      @address_records = @db.execute(<<-EOS, name, ACTIVE) # シングルスレッドェ…
        SELECT ip_address, ttl, priority, weight FROM records
        WHERE name = ? AND activity = ?
      EOS

      # レコードがあればキャッシュに入れる(ネガティブキャッシュはしない)
      unless @address_records.empty?
        min_ttl = nil

        # レコードはハッシュに変換する
        cache_records = @address_records.map do |i|
          ip_address, ttl, priority, weight = i.values_at('ip_address', 'ttl', 'priority', 'weight')

          if min_ttl.nil? or ttl < min_ttl
            min_ttl = ttl
          end

          {'ip_address' => ip_address, 'ttl' => ttl, 'priority' => priority, 'weight' => weight}
        end

        # 最小値をExpire期限として設定
        expire_time = now + min_ttl

        @cache[name] = [cache_records, expire_time]
      end
    end # キャッシュが見つからなかった場合
  end # @cache.nil?の判定

  @address_records.length.nonzero?
end

#closeObject



431
432
433
434
# File 'lib/srv/murakumo_cloud.rb', line 431

def close
  # データベースをクローズ
  @db.close
end

#create_gossipObject

ゴシップオブジェクトを生成



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/srv/murakumo_cloud.rb', line 118

def create_gossip
  @gossip = RGossip2.client({
    :initial_nodes   => @options[:initial_nodes],
    :address         => @address,
    :data            => @initial_datas,
    :auth_key        => @options[:auth_key],
    :port            => @options[:gossip_port],
    :node_lifetime   => @options[:gossip_node_lifetime],
    :gossip_interval => @options[:gossip_send_interval],
    :receive_timeout => @options[:gossip_receive_timeout],
    :logger          => @logger,
    :ping_init_nodes => @options[:ping_init_nodes],
  })

  # ノードの更新をフック
  @gossip.context.callback_handler = lambda do |act, addr, ts, dt, old_dt|
    case act
    when :add, :comeback
      # 追加・復帰の時は無条件に更新
      update(addr, dt)
    when :update
      # 更新の時はデータが更新されたときのみ更新
      update(addr, dt) if dt != old_dt
    when :delete
      delete(addr)
    end
  end
end

#delete(address) ⇒ Object



469
470
471
# File 'lib/srv/murakumo_cloud.rb', line 469

def delete(address)
  @db.execute('DELETE FROM records WHERE ip_address = ?', address)
end

#delete_by_names(address, names) ⇒ Object



473
474
475
476
477
478
479
480
# File 'lib/srv/murakumo_cloud.rb', line 473

def delete_by_names(address, names)
  names = names.map {|i| "'#{i.downcase}'" }.join(',')

  @db.execute(<<-EOS, address)
    DELETE FROM records
    WHERE ip_address = ? AND name IN (#{names})
  EOS
end

#delete_nodes(nodes) ⇒ Object



389
390
391
392
393
394
395
396
397
# File 'lib/srv/murakumo_cloud.rb', line 389

def delete_nodes(nodes)
  errmsg = nil

  nodes.each do |i|
    @gossip.delete_node(i)
  end

  return [!errmsg, errmsg]
end

#delete_records(names) ⇒ Object



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/srv/murakumo_cloud.rb', line 345

def delete_records(names)
  errmsg = nil

  # 名前は小文字に変換
  names = names.map {|i| i.downcase }

  @gossip.transaction do
    # データを削除
    @gossip.data.reject! do |d|
      if names.any? {|n| n == d[0] }
        if d[2] == ORIGIN
          # オリジンは削除不可
          errmsg = 'original host name cannot be deleted'
          names.reject! {|n| n == d[0] }
          false
        else
          true
        end
      end
    end
  end # transaction

  # データベースを更新
  delete_by_names(@address, names)

  # ヘルスチェックがあれば停止
  names.each do |name|
    checker = @health_checkers[name]
    checker.stop if checker
  end

  return [!errmsg, errmsg]
end

#exec_start_script(script) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/srv/murakumo_cloud.rb', line 178

def exec_start_script(script)
  @logger.info("starting script is performed: #{script}")

  Open3.popen3("#{script} '#{@address}' '#{@hostname}'") do |stdin, stdout, stderr|
    out = stdout.read.strip
    @logger.info(out) unless out.empty?

    err = stderr.read.strip
    @logger.error(err) unless err.empty?
  end
rescue Exception => e
  message = (["#{e.class}: #{e.message}"] + (e.backtrace || [])).join("\n\tfrom ")
  @logger.error("#{@name}: #{message}")
end

#get_attr(name) ⇒ Object



399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/srv/murakumo_cloud.rb', line 399

def get_attr(name)
  return unless ATTRIBUTES.has_key?(name)

  if name == :log_level
    if @gossip.logger
      %w(debug info warn error fatal)[@gossip.logger.level]
    else
      nil
    end
  else
    attr, conv = ATTRIBUTES[name]
    @gossip.context.send(attr).to_s
  end
end

#list_recordsObject



292
293
294
295
296
297
298
# File 'lib/srv/murakumo_cloud.rb', line 292

def list_records
  columns = %w(ip_address name ttl priority weight activity)

  @db.execute(<<-EOS).map {|i| i.values_at(*columns) }
    SELECT #{columns.join(', ')} FROM records ORDER BY ip_address, name
  EOS
end

#lookup_addresses(name) ⇒ Object



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
# File 'lib/srv/murakumo_cloud.rb', line 549

def lookup_addresses(name)
  records = nil

  if @address_records.length == 1
    # レコードが一件ならそれを返す
    return @address_records.map {|i| i.values_at('ip_address', 'ttl') }
  end

  # 名前は小文字に変換
  name = name.downcase

  # ドメインが指定されていたら削除
  name.sub!(/\.#{Regexp.escape(@options[:domain])}\Z/i, '') if @options[:domain]

  # 優先度の高いレコードを検索
  records = shuffle_records(@address_records.select {|i| i['priority'] == MASTER }, name)

  # 次に優先度の高いレコードを検索
  records.concat(shuffle_records(@address_records.select {|i| i['priority'] == SECONDARY }, name))

  # レコードが見つからなかった場合はバックアップを選択
  if records.empty?
    records = shuffle_records(@address_records.select {|i| i['priority'] == BACKUP }, name)
  end

  # それでもレコードが見つからなかった場合はオリジンを選択
  # ※このパスは通らない
  records = @address_records if records.empty?

  # IPアドレス、TTLを返す
  records.map {|i| i.values_at('ip_address', 'ttl') }
ensure
  # エラー検出のため、一応クリア
  @address_records = nil
end

#lookup_name(address) ⇒ Object



606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
# File 'lib/srv/murakumo_cloud.rb', line 606

def lookup_name(address)
  record = nil

  if @name_records.length == 1
    # レコードが一件ならそれを返す
    record = @name_records.first
  else
    # オリジンを検索
    record = @name_records.find {|i| i['priority'] == ORIGIN }

    # レコードが見つからなかった場合は優先度の高いレコード選択
    unless record
      record = @name_records.find {|i| i['priority'] == ACTIVE }
    end

    # それでもレコードが見つからなかった場合は優先度の低いレコードを選択
    record = @name_records.first unless record
  end

  # ホスト名、TTLを返す
  return record.values_at('name', 'ttl')
ensure
  # エラー検出のため、一応クリア
  @name_records = nil
end

#name_exist?(address, resource_class) ⇒ Boolean

Returns:

  • (Boolean)


585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
# File 'lib/srv/murakumo_cloud.rb', line 585

def name_exist?(address, resource_class)
  address = x_ip_addr(address)

  # includes、excludesのチェック
  if @options[:addr_excludes] and @options[:addr_excludes].any? {|r| r =~ address }
    return false
  end

  if @options[:addr_includes] and not @options[:addr_includes].any? {|r| r =~ address }
    return false
  end

  # シングルスレッドェ…
  @name_records = @db.execute(<<-EOS, address, ACTIVE)
    SELECT name, ttl, priority FROM records
    WHERE ip_address = ? AND activity = ?
  EOS

  @name_records.length.nonzero?
end

#set_attr(name, value) ⇒ Object



414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/srv/murakumo_cloud.rb', line 414

def set_attr(name, value)
  return unless ATTRIBUTES.has_key?(name)

  errmsg = nil

  if name == :log_level
    if @gossip.logger
      @gossip.logger.level = %w(debug info warn error fatal).index(value.to_s)
    end
  else
    attr, conv = ATTRIBUTES[name]
    @gossip.context.send("#{attr}=", value.send(conv)).to_s
  end

  return [!errmsg, errmsg]
end

#startObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/srv/murakumo_cloud.rb', line 147

def start
  @logger.info("Delay of a gossip start: #{@options[:gossip_start_delay]}")

  Thread.start do
    # ゴシッププロトコルの開始を遅延
    sleep @options[:gossip_start_delay]

    @logger.info('Gossip is starting')
    # ゴシップオブジェクトを生成
    create_gossip

    # デーモン化すると子プロセスはすぐ死ぬので
    # このタイミングでヘルスチェックを起動
    @health_checkers.each do |name, checker|
      checker.start
    end

    # アクティビティチェックを起動
    @activity_checkers.each do |name, checker|
      checker.start
    end

    # 起動時フックスクリプトの実行
    if @options[:on_start]
      exec_start_script(@options[:on_start])
    end

    @gossip.start
  end
end

#to_hashObject



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/srv/murakumo_cloud.rb', line 193

def to_hash
  keys = {
    :auth_key      => 'auth-key',
    :dns_address   => 'address',
    :dns_port      => 'port',
    :initial_nodes => lambda {|v| ['initial-nodes', v.join(',')] },
    :resolver      => lambda {|v| [
      'resolver',
      v.instance_variable_get(:@config).instance_variable_get(:@config_info)[:nameserver].join(',')
    ]},
    :socket        => 'socket',
    :max_ip_num    => 'max-ip-num',
    :domain        => 'domain',
    :enable_cache  => lambda {|v| ['enable-cache', !!v] },
    :log_path      => 'log-path',
    :log_level     => 'log-level',
    :gossip_port   => 'gossip-port',
    :gossip_node_lifetime => lambda {|v| [
      'gossip-node-lifetime',
      @gossip.context.node_lifetime
    ]},
    :gossip_send_interval => lambda {|v| [
      'gossip-send-interval',
      @gossip.context.gossip_interval
    ]},
    :gossip_receive_timeout => lambda {|v| [
      'gossip-receive-timeout',
      @gossip.context.receive_timeout
    ]},
    :ping_init_nodes    => 'ping-init-nodes',
    :gossip_start_delay => 'gossip-start-delay',
  }

  hash = {}

  keys.each do |k, name|
    value = @options[k]

    if value and name.respond_to?(:call)
      name, value = name.call(value)
    end

    if value and (not value.kind_of?(String) or not value.empty?)
      hash[name] = value
    end
  end

  records = list_records.select {|r| r[0] == @address }

  hash['host'] = records.find {|r| r[3] == ORIGIN }[0..2].join(',')

  aliases = records.select {|r| r[3] != ORIGIN }.map do |r|
    [r[1], r[2], (r[3] == MASTER ? 'master' : r[3] == SECONDARY ? 'secondary' : 'backup'), r[4]].join(',')
  end

  hash['alias'] = aliases unless aliases.empty?

  # 設定ファイルのみの項目
  if @options.config_file
    if @options.config_file['health-check']
      hash['health-check'] = @options.config_file['health-check']

      # ちょっと直したい…
      hash['health-check'].values.each do |h|
        next unless h['init-status']

        h['init-status'] = {
          ACTIVE   => 'active' ,
          INACTIVE => 'inactive',
        }.fetch(h['init-status'])
      end
    end

    if @options.config_file['activity-check']
      hash['activity-check'] = @options.config_file['activity-check']
    end

    if @options.config_file['notification']
      hash['notification'] = @options.config_file['notification']
    end

    %w(name-includes name-excludes addr-includes addr-excludes).each do |key|
      if @options.config_file[key]
        hash[key] = @options.config_file[key]
      end
    end

    if @options.config_file['balancing']
      hash['balancing'] = @options.config_file['balancing']
    end

    if @options.config_file['on-start']
      hash['on-start'] = @options.config_file['on-start']
    end
  end # 設定ファイルのみの項目

  return hash
end

#update(address, datas, update_only = false) ⇒ Object

Operation of storage



438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/srv/murakumo_cloud.rb', line 438

def update(address, datas, update_only = false)
  return unless datas

  datas.each do |i|
    name, ttl, priority, weight, activity = i

    # 名前は小文字に変換
    name = name.downcase

    # ホスト名と同じなら警告
    if name == @hostname
      @logger.warn('same hostname as origin was found')
    end

    @db.execute(<<-EOS, address, name, ttl, priority, weight, activity)
      REPLACE INTO records (ip_address, name, ttl, priority, weight, activity)
      VALUES (?, ?, ?, ?, ?, ?)
    EOS
  end

  # データにないレコードは消す
  unless update_only
    names = datas.map {|i| "'#{i.first.downcase}'" }.join(',')

    @db.execute(<<-EOS, address)
      DELETE FROM records
      WHERE ip_address = ? AND name NOT IN (#{names})
    EOS
  end
end