Class: FFWD::Plugin::JsonLine::Connection

Inherits:
Connection
  • Object
show all
Includes:
EM::Protocols::LineText2, Logging
Defined in:
lib/ffwd/plugin/json_line/connection.rb

Constant Summary collapse

EVENT_FIELDS =
[
  ["key", :key],
  ["value", :value],
  ["host", :host],
  ["state", :state],
  ["description", :description],
  ["ttl", :ttl],
  ["tags", :tags],
  ["attributes", :attributes],
]
METRIC_FIELDS =
[
  ["proc", :proc],
  ["key", :key],
  ["value", :value],
  ["tags", :tags],
  ["attributes", :attributes]
]

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #log

Methods inherited from Connection

#datasink=, #send_data

Constructor Details

#initialize(bind, core, buffer_limit) ⇒ Connection

Returns a new instance of Connection.



49
50
51
52
53
# File 'lib/ffwd/plugin/json_line/connection.rb', line 49

def initialize bind, core, buffer_limit
  @bind = bind
  @core = core
  @buffer_limit = buffer_limit
end

Class Method Details

.plugin_typeObject



45
46
47
# File 'lib/ffwd/plugin/json_line/connection.rb', line 45

def self.plugin_type
  "json_line_in"
end

Instance Method Details

#read_event(data) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/ffwd/plugin/json_line/connection.rb', line 104

def read_event data
  d = {}

  read_tags d, data["tags"]
  read_time d, data["time"]

  EVENT_FIELDS.each do |from, to|
    next if (v = data[from]).nil?
    d[to] = v
  end

  d
end

#read_metric(data) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ffwd/plugin/json_line/connection.rb', line 90

def read_metric data
  d = {}

  read_tags d, data["tags"]
  read_time d, data["time"]

  METRIC_FIELDS.each do |from, to|
    next if (v = data[from]).nil?
    d[to] = v
  end

  d
end

#read_tags(d, source) ⇒ Object



80
81
82
83
# File 'lib/ffwd/plugin/json_line/connection.rb', line 80

def read_tags d, source
  return if (tags = d["tags"]).nil?
  d[:tags] = tags.to_set
end

#read_time(d, source) ⇒ Object



85
86
87
88
# File 'lib/ffwd/plugin/json_line/connection.rb', line 85

def read_time d, source
  return if (time = d["time"]).nil?
  d[:time] = Time.at time
end

#receive_line(data) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ffwd/plugin/json_line/connection.rb', line 55

def receive_line data
  data = JSON.load(data)

  unless type = data["type"]
    log.error "Field 'type' missing from received line"
    return
  end

  if type == "metric"
    @core.input.metric read_metric(data)
    @bind.increment :received_metric
    return
  end

  if type == "event"
    @core.input.event read_event(data)
    @bind.increment :received_event
    return
  end

  log.error "No such type: #{type}"
rescue => e
  log.error "Failed to receive line", e
end