Class: Fluent::ForwardOutput::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward.rb

Direct Known Subclasses

NoneHeartbeatNode

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(log, conf) ⇒ Node

Returns a new instance of Node.



450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/fluent/plugin/out_forward.rb', line 450

def initialize(log, conf)
  @log = log
  @conf = conf
  @name = @conf.name
  @host = @conf.host
  @port = @conf.port
  @weight = @conf.weight
  @failure = @conf.failure
  @available = true

  @resolved_host = nil
  @resolved_time = 0
  resolved_host  # check dns
end

Instance Attribute Details

#availableObject (readonly)

for test



468
469
470
# File 'lib/fluent/plugin/out_forward.rb', line 468

def available
  @available
end

#confObject (readonly)

Returns the value of attribute conf.



465
466
467
# File 'lib/fluent/plugin/out_forward.rb', line 465

def conf
  @conf
end

#failureObject (readonly)

for test



468
469
470
# File 'lib/fluent/plugin/out_forward.rb', line 468

def failure
  @failure
end

#hostObject (readonly)

Returns the value of attribute host.



466
467
468
# File 'lib/fluent/plugin/out_forward.rb', line 466

def host
  @host
end

#nameObject (readonly)

Returns the value of attribute name.



466
467
468
# File 'lib/fluent/plugin/out_forward.rb', line 466

def name
  @name
end

#portObject (readonly)

Returns the value of attribute port.



466
467
468
# File 'lib/fluent/plugin/out_forward.rb', line 466

def port
  @port
end

#sockaddrObject (readonly)

used by on_heartbeat



467
468
469
# File 'lib/fluent/plugin/out_forward.rb', line 467

def sockaddr
  @sockaddr
end

#weightObject (readonly)

Returns the value of attribute weight.



466
467
468
# File 'lib/fluent/plugin/out_forward.rb', line 466

def weight
  @weight
end

Instance Method Details

#available?Boolean

Returns:

  • (Boolean)


470
471
472
# File 'lib/fluent/plugin/out_forward.rb', line 470

def available?
  @available
end

#disable!Object



474
475
476
# File 'lib/fluent/plugin/out_forward.rb', line 474

def disable!
  @available = false
end

#heartbeat(detect = true) ⇒ Object



542
543
544
545
546
547
548
549
550
551
552
553
# File 'lib/fluent/plugin/out_forward.rb', line 542

def heartbeat(detect=true)
  now = Time.now.to_f
  @failure.add(now)
  #@log.trace "heartbeat from '#{@name}'", :host=>@host, :port=>@port, :available=>@available, :sample_size=>@failure.sample_size
  if detect && !@available && @failure.sample_size > @conf.recover_sample_size
    @available = true
    @log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port
    return true
  else
    return nil
  end
end

#resolved_hostObject



482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/fluent/plugin/out_forward.rb', line 482

def resolved_host
  case @conf.expire_dns_cache
  when 0
    # cache is disabled
    return resolve_dns!

  when nil
    # persistent cache
    return @resolved_host ||= resolve_dns!

  else
    now = Engine.now
    rh = @resolved_host
    if !rh || now - @resolved_time >= @conf.expire_dns_cache
      rh = @resolved_host = resolve_dns!
      @resolved_time = now
    end
    return rh
  end
end

#standby?Boolean

Returns:

  • (Boolean)


478
479
480
# File 'lib/fluent/plugin/out_forward.rb', line 478

def standby?
  @conf.standby
end

#tickObject



511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# File 'lib/fluent/plugin/out_forward.rb', line 511

def tick
  now = Time.now.to_f
  if !@available
    if @failure.hard_timeout?(now)
      @failure.clear
    end
    return nil
  end

  if @failure.hard_timeout?(now)
    @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, hard_timeout: true
    @available = false
    @resolved_host = nil  # expire cached host
    @failure.clear
    return true
  end

  if @conf.phi_failure_detector
    phi = @failure.phi(now)
    #$log.trace "phi '#{@name}'", :host=>@host, :port=>@port, :phi=>phi
    if phi > @conf.phi_threshold
      @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, phi: phi
      @available = false
      @resolved_host = nil  # expire cached host
      @failure.clear
      return true
    end
  end
  return false
end

#to_msgpack(out = '') ⇒ Object



555
556
557
# File 'lib/fluent/plugin/out_forward.rb', line 555

def to_msgpack(out = '')
  [@host, @port, @weight, @available].to_msgpack(out)
end