Class: LogStash::Outputs::Riemann

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/riemann.rb

Overview

Riemann is a network event stream processing system.

While Riemann is very similar conceptually to Logstash, it has much more in terms of being a monitoring system replacement.

Riemann is used in Logstash much like statsd or other metric-related outputs

You can learn about Riemann here:

You can see the author talk about it here:

Instance Method Summary collapse

Instance Method Details

#build_riemann_formatted_event(event) ⇒ Object

def receive



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/logstash/outputs/riemann.rb', line 131

def build_riemann_formatted_event(event)
  # Let's build us an event, shall we?
  r_event = Hash.new

  # Always copy "message" to Riemann's "description" field.
  r_event[:description] = event.get("message")

  # Directly map all other fields, if requested. Note that the "message" field
  # will also be mapped this way, so if it's present, it will become a
  # redundant copy of "description".
  if @map_fields == true
    r_event.merge! map_fields(nil, event.to_hash)
  end

  # Fields specified in the "riemann_event" configuration option take
  # precedence over mapped fields.
  if @riemann_event
    @riemann_event.each do |key, val|
      r_event[key.to_sym] = event.sprintf(val)
    end
  end

  # Riemann event attributes are always strings, with a few critical
  # exceptions. "ttl" and "metric" should be sent as float values.
  r_event[:ttl] = r_event[:ttl].to_f if r_event[:ttl]
  r_event[:metric] = r_event[:metric].to_f if r_event[:metric]

  # Similarly, event _time_ in Riemann was historically an integer value.
  # While current Riemann versions support sub-second time resolution in the
  # form of a float, we currently ensure that we send an integer value, as
  # expected by Riemann versions earlier than 0.2.13.
  r_event[:time] = event.timestamp.to_i

  r_event[:tags] = event.get("tags") if event.get("tags").is_a?(Array)
  r_event[:host] = event.sprintf(@sender)

  return r_event
end

#map_fields(parent, fields) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/logstash/outputs/riemann.rb', line 109

def map_fields(parent, fields)
  this_level = Hash.new
  fields.each do |key, contents|
    next if key.start_with?("@")
    field = parent.nil? ? key : "#{parent}.#{key}"
    if contents.is_a?(Hash)
      this_level.merge! map_fields(field, contents)
    else
      this_level[field.to_sym] = contents
    end
  end
  return this_level
end

#receive(event) ⇒ Object



124
125
126
127
128
129
# File 'lib/logstash/outputs/riemann.rb', line 124

def receive(event)
  r_event = build_riemann_formatted_event(event)

  @logger.debug("Riemann event: ", :riemann_event => r_event)
  send_to_riemann(r_event)
end

#registerObject



103
104
105
106
# File 'lib/logstash/outputs/riemann.rb', line 103

def register
  require 'riemann/client'
  @client = Riemann::Client.new(:host => @host, :port => @port)
end

#send_to_riemann(riemann_formatted_event) ⇒ Object



170
171
172
173
174
175
176
177
178
# File 'lib/logstash/outputs/riemann.rb', line 170

def send_to_riemann(riemann_formatted_event)
  begin
    proto_client = @client.instance_variable_get("@#{@protocol}")
    @logger.debug("Riemann client proto: #{proto_client.to_s}")
    proto_client << riemann_formatted_event
  rescue Exception => e
    @logger.error("Unhandled exception", :error => e)
  end
end