Class: FFWD::Plugin::Kafka::Output

Inherits:
FFWD::ProducingClient::Producer
  • Object
show all
Includes:
Logging, Reporter
Defined in:
lib/ffwd/plugin/kafka/output.rb

Constant Summary collapse

MAPPING =
[:host, :ttl, :key, :time, :value, :tags, :attributes]
DEFAULT_PRODUCER =
"ffwd"
DEFAULT_BROKERS =
["localhost:9092"]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(schema, router, partitioner, config) ⇒ Output

Returns a new instance of Output.



41
42
43
44
45
46
47
48
49
# File 'lib/ffwd/plugin/kafka/output.rb', line 41

def initialize schema, router, partitioner, config
  @schema = schema
  @router = router
  @partitioner = partitioner
  @producer = config[:producer]
  @brokers = config[:brokers]
  @reporter_meta = {:producer_type => "kafka", :producer => @producer}
  @instance = nil
end

Instance Attribute Details

#reporter_metaObject (readonly)

Returns the value of attribute reporter_meta.



28
29
30
# File 'lib/ffwd/plugin/kafka/output.rb', line 28

def reporter_meta
  @reporter_meta
end

Class Method Details

.prepare(config) ⇒ Object



35
36
37
38
39
# File 'lib/ffwd/plugin/kafka/output.rb', line 35

def self.prepare config
  config[:producer] ||= DEFAULT_PRODUCER
  config[:brokers] ||= DEFAULT_BROKERS
  config
end

Instance Method Details

#make_event_message(e) ⇒ Object



95
96
97
98
99
100
101
# File 'lib/ffwd/plugin/kafka/output.rb', line 95

def make_event_message e
  topic = @router.route_event e
  return nil if topic.nil?
  data = @schema.dump_event e
  key = @partitioner.partition e
  MessageToSend.new topic, data, key
end

#make_metric_message(m) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/ffwd/plugin/kafka/output.rb', line 103

def make_metric_message m
  topic = @router.route_metric m
  return nil if topic.nil?
  data = @schema.dump_metric m
  key = @partitioner.partition m
  MessageToSend.new topic, data, key
end

#produce(events, metrics) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/ffwd/plugin/kafka/output.rb', line 67

def produce events, metrics
  unless @instance
    return nil
  end

  expected_messages = events.size + metrics.size
  messages = []

  events.each do |e|
    message = make_event_message e
    next if message.nil?
    messages << message
  end

  metrics.each do |e|
    message = make_metric_message e
    next if message.nil?
    messages << message
  end

  if messages.size < expected_messages
    increment :kafka_routing_error, expected_messages - messages.size
  end

  increment :kafka_routing_success, messages.size
  @instance.send_messages messages
end

#setupObject



51
52
53
54
55
56
57
58
# File 'lib/ffwd/plugin/kafka/output.rb', line 51

def setup
  if not @brokers or @brokers.empty?
    log.error "No usable initial list of brokers"
    return
  end

  @instance = Producer.new @brokers, @producer
end

#teardownObject



60
61
62
63
64
65
# File 'lib/ffwd/plugin/kafka/output.rb', line 60

def teardown
  if @instance
    @instance.stop
    @instance = nil
  end
end