Class: Fluent::ForwardOutput::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward.rb

Direct Known Subclasses

NoneHeartbeatNode

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(log, conf) ⇒ Node

Returns a new instance of Node.



430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/fluent/plugin/out_forward.rb', line 430

def initialize(log, conf)
  @log = log
  @conf = conf
  @name = @conf.name
  @host = @conf.host
  @port = @conf.port
  @weight = @conf.weight
  @failure = @conf.failure
  @available = true

  @resolved_host = nil
  @resolved_time = 0
  resolved_host  # check dns
end

Instance Attribute Details

#availableObject (readonly)

for test



448
449
450
# File 'lib/fluent/plugin/out_forward.rb', line 448

def available
  @available
end

#confObject (readonly)

Returns the value of attribute conf.



445
446
447
# File 'lib/fluent/plugin/out_forward.rb', line 445

def conf
  @conf
end

#failureObject (readonly)

for test



448
449
450
# File 'lib/fluent/plugin/out_forward.rb', line 448

def failure
  @failure
end

#hostObject (readonly)

Returns the value of attribute host.



446
447
448
# File 'lib/fluent/plugin/out_forward.rb', line 446

def host
  @host
end

#nameObject (readonly)

Returns the value of attribute name.



446
447
448
# File 'lib/fluent/plugin/out_forward.rb', line 446

def name
  @name
end

#portObject (readonly)

Returns the value of attribute port.



446
447
448
# File 'lib/fluent/plugin/out_forward.rb', line 446

def port
  @port
end

#sockaddrObject (readonly)

used by on_heartbeat



447
448
449
# File 'lib/fluent/plugin/out_forward.rb', line 447

def sockaddr
  @sockaddr
end

#weightObject (readonly)

Returns the value of attribute weight.



446
447
448
# File 'lib/fluent/plugin/out_forward.rb', line 446

def weight
  @weight
end

Instance Method Details

#available?Boolean

Returns:

  • (Boolean)


450
451
452
# File 'lib/fluent/plugin/out_forward.rb', line 450

def available?
  @available
end

#disable!Object



454
455
456
# File 'lib/fluent/plugin/out_forward.rb', line 454

def disable!
  @available = false
end

#heartbeat(detect = true) ⇒ Object



522
523
524
525
526
527
528
529
530
531
532
533
# File 'lib/fluent/plugin/out_forward.rb', line 522

def heartbeat(detect=true)
  now = Time.now.to_f
  @failure.add(now)
  #@log.trace "heartbeat from '#{@name}'", :host=>@host, :port=>@port, :available=>@available, :sample_size=>@failure.sample_size
  if detect && !@available && @failure.sample_size > @conf.recover_sample_size
    @available = true
    @log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port
    return true
  else
    return nil
  end
end

#resolved_hostObject



462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'lib/fluent/plugin/out_forward.rb', line 462

def resolved_host
  case @conf.expire_dns_cache
  when 0
    # cache is disabled
    return resolve_dns!

  when nil
    # persistent cache
    return @resolved_host ||= resolve_dns!

  else
    now = Engine.now
    rh = @resolved_host
    if !rh || now - @resolved_time >= @conf.expire_dns_cache
      rh = @resolved_host = resolve_dns!
      @resolved_time = now
    end
    return rh
  end
end

#standby?Boolean

Returns:

  • (Boolean)


458
459
460
# File 'lib/fluent/plugin/out_forward.rb', line 458

def standby?
  @conf.standby
end

#tickObject



491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# File 'lib/fluent/plugin/out_forward.rb', line 491

def tick
  now = Time.now.to_f
  if !@available
    if @failure.hard_timeout?(now)
      @failure.clear
    end
    return nil
  end

  if @failure.hard_timeout?(now)
    @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, hard_timeout: true
    @available = false
    @resolved_host = nil  # expire cached host
    @failure.clear
    return true
  end

  if @conf.phi_failure_detector
    phi = @failure.phi(now)
    #$log.trace "phi '#{@name}'", :host=>@host, :port=>@port, :phi=>phi
    if phi > @conf.phi_threshold
      @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, phi: phi
      @available = false
      @resolved_host = nil  # expire cached host
      @failure.clear
      return true
    end
  end
  return false
end

#to_msgpack(out = '') ⇒ Object



535
536
537
# File 'lib/fluent/plugin/out_forward.rb', line 535

def to_msgpack(out = '')
  [@host, @port, @weight, @available].to_msgpack(out)
end