Class: Fluent::BurrowPlugin

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_burrow.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Parse config hash



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_burrow.rb', line 35

def configure(conf)
  super

  # One of 'tag', 'remove_prefix' or 'add_prefix' must be specified
  if not @tag and not @remove_prefix and not @add_prefix
    raise Fluent::ConfigError, "One of 'tag', 'remove_prefix' or 'add_prefix' must be specified"
  end
  if @tag and (@remove_prefix or @add_prefix)
    raise Fluent::ConfigError, "Specifying both 'tag' and either 'remove_prefix' or 'add_prefix' is not supported"
  end

  # Prepare for tag modification if required
  if @remove_prefix
    @removed_prefix_string = @remove_prefix.chomp('.') + '.'
    @removed_length = @removed_prefix_string.length
  end
  if @add_prefix
    @added_prefix_string = @add_prefix.chomp('.') + '.'
  end

  # Validate action
  actions = ['replace','overlay','inplace','prefix']
  if not actions.include? @action
    raise Fluent::ConfigError, "Invalid 'action', must be one of #{actions.join(',')}"
  end

  # Validate action-based restrictions
  if @action == 'inplace' and @keep_key
    raise Fluent::ConfigError, "Specifying 'keep_key' with action 'inplace' is not supported"
  end
  if @action == 'prefix' and not @data_prefix
    raise Fluent::ConfigError, "You must specify 'data_prefix' with action 'prefix'"
  end

  # Prepare fluent's built-in parser
  @parser = Fluent::TextParser.new()
  @parser.configure(conf)
end

#emit(tag, es, chain) ⇒ Object

This method is called when an event reaches Fluentd.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/fluent/plugin/out_burrow.rb', line 84

def emit(tag, es, chain)

  # Figure out new event tag (either manually specified, or modified with add_prefix|remove_prefix)
  if @tag
    tag = @tag
  else
    if @remove_prefix and
        ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
      tag = tag[@removed_length..-1]
    end
    if @add_prefix
      if tag and tag.length > 0
        tag = @added_prefix_string + tag
      else
        tag = @add_prefix
      end
    end
  end

  # Handle all currently available events in stream
  es.each do |time,record|
    # Extract raw key value
    raw_value = record[@key_name]

    # Remember original time key, or raw event time
    raw_time = record[@record_time_key]

    # Try to parse it according to 'format'
    t,values = raw_value ? @parser.parse(raw_value) : [nil, nil]

    # Set new event's time to current time unless new time key was found in the sub-event
    t ||= raw_time

    r = values;

    # Overlay new record on top of original record?
    case @action
    when 'inplace'
      r = record.merge({@key_name => r})
    when 'overlay'
      r = record.merge(r)
    when 'replace'
      # noop
    when 'prefix'
      r = record.merge({@data_prefix => r})
    end

    # Keep the key?
    if ['overlay','replace','prefix'].include? @action
      if not @keep_key and not r.nil? and r.has_key?(@key_name)
        r.delete(@key_name)
      end
    end

    # Preserve 'time' key?
    if @keep_time
      r[@record_time_key] = raw_time
    end

    # Emit event back to Fluent
    if r
      router.emit(tag, t, r)
    end
  end

  chain.next
end

#shutdownObject

This method is called when shutting down.



80
81
# File 'lib/fluent/plugin/out_burrow.rb', line 80

def shutdown
end

#startObject

This method is called when starting.



75
76
77
# File 'lib/fluent/plugin/out_burrow.rb', line 75

def start
  super
end