Class: Fluent::FlumeInput::FluentFlumeHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_flume.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#add_prefixObject

Returns the value of attribute add_prefix.



110
111
112
# File 'lib/fluent/plugin/in_flume.rb', line 110

def add_prefix
  @add_prefix
end

#default_tagObject

Returns the value of attribute default_tag.



109
110
111
# File 'lib/fluent/plugin/in_flume.rb', line 109

def default_tag
  @default_tag
end

#message_formatObject

Returns the value of attribute message_format.



111
112
113
# File 'lib/fluent/plugin/in_flume.rb', line 111

def message_format
  @message_format
end

#tag_fieldObject

Returns the value of attribute tag_field.



108
109
110
# File 'lib/fluent/plugin/in_flume.rb', line 108

def tag_field
  @tag_field
end

Instance Method Details

#ackedAppend(evt) ⇒ Object



140
141
142
143
# File 'lib/fluent/plugin/in_flume.rb', line 140

def ackedAppend(evt)
  $log.error "ackedAppend is not implemented yet: #{evt}"
  EventStatus::OK
end

#append(evt) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/fluent/plugin/in_flume.rb', line 113

def append(evt)
  begin
    record = create_record(evt)
    if @tag_field
      tag = evt.fieldss[@tag_field] || @default_tag
      unless tag
        return # ignore
      end
    else
      tag = @default_tag
    end
    timestamp = evt.timestamp.to_i
    if @add_prefix
      Engine.emit(@add_prefix + '.' + tag, timestamp, record)
    else
      Engine.emit(tag, timestamp, record)
    end
  rescue => e
    $log.error "unexpected error", :error=>$!.to_s
    $log.error_backtrace
  end
end

#closeObject



145
146
# File 'lib/fluent/plugin/in_flume.rb', line 145

def close()
end

#rawAppend(evt) ⇒ Object



136
137
138
# File 'lib/fluent/plugin/in_flume.rb', line 136

def rawAppend(evt)
  $log.error "rawAppend is not implemented yet: #{evt}"
end