Class: Fluent::Plugin::ZookeeperOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_zookeeper.rb

Instance Method Summary collapse

Constructor Details

#initializeZookeeperOutput

Returns a new instance of ZookeeperOutput.



36
37
38
39
# File 'lib/fluent/plugin/out_zookeeper.rb', line 36

def initialize
  super
  @zk = nil
end

Instance Method Details

#configure(conf) ⇒ Object



74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_zookeeper.rb', line 74

def configure(conf)
  super

  if @type != 'persistent' && @type != 'ephemeral' && @type != 'sequence'
    log.warn "'type' parameter value is wrong (#@type). Will use default value (persistent))"
    @type = 'persistent'
  end

  @formatter_proc = setup_json_formatter
end

#init_client(raise_exception = true) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_zookeeper.rb', line 41

def init_client(raise_exception = true)
  log.info "Initializing connection to Zookeeper"
  begin
    if @zk.nil?
      @zk = Zookeeper.new(@servers)
    else
      @zk.reopen
    end

    if @zk.connected?
      case @type
      when 'ephemeral'
        @zk.create({path: @path, ephemeral: true})
      when 'sequence'
        @zk.create({path: @path, sequence: true})
      else
        # persistent (or anything else)
        @zk.create({path: @path})
      end
      log.info "Connection to Zookeeper service [#@servers] has been initialized"
      @con_lost_msg = "Connection to Zookeeper was lost"
    else
      log.warn "Cannot establish connection to Zookeeper"
    end
  rescue Exception => e
    if raise_exception
      raise e
    else
      log.error e
    end
  end
end

#process(tag, es) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/fluent/plugin/out_zookeeper.rb', line 103

def process(tag, es)
  if @zk.connected?
    begin
      es.each do |time, record|
        begin
          data = @formatter_proc.call(record)
        rescue StandardError => e
          log.warn "Failed to format record:", :error => e.to_s, :record => record
          next
        end
        if @ignore_empty_msg && data == "{}"
          log.debug "Skipping empty record"
          next
        end
        @zk.set({path: @path, data: data})
      end
    rescue Exception => e
      log.error "Exception occurred while sending data: #{e}"
      # Connection will be reinitialized on next call
      @zk.close
    end
  elsif !@zk.connecting?
    # We are not connected and not connecting; it's time to reinit the client
    @zk.close if !@zk.closed?
    init_client(false)
  else
    if !@con_lost_msg.nil?
      log.warn "#@con_lost_msg"
      @con_lost_msg = nil
    end
  end
end

#setup_json_formatterObject



85
86
87
88
# File 'lib/fluent/plugin/out_zookeeper.rb', line 85

def setup_json_formatter
  Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
  Proc.new { |record| Oj.dump(record) }
end

#shutdownObject



95
96
97
98
99
100
101
# File 'lib/fluent/plugin/out_zookeeper.rb', line 95

def shutdown
  @zk.delete({path: @path})
  @zk.close
  log.info "Connection to Zookeeper service has been gracefully closed"
  @zk = nil
  super
end

#startObject



90
91
92
93
# File 'lib/fluent/plugin/out_zookeeper.rb', line 90

def start
  super
  init_client
end