Class: FFWD::Plugin::Kafka::Output
- Inherits:
-
FFWD::ProducingClient::Producer
- Object
- FFWD::ProducingClient::Producer
- FFWD::Plugin::Kafka::Output
- Includes:
- Logging, Reporter
- Defined in:
- lib/ffwd/plugin/kafka/output.rb
Constant Summary collapse
- MAPPING =
[:host, :ttl, :key, :time, :value, :tags, :attributes]
- DEFAULT_PRODUCER =
"ffwd"
- DEFAULT_BROKERS =
["localhost:9092"]
Instance Attribute Summary collapse
-
#reporter_meta ⇒ Object
readonly
Returns the value of attribute reporter_meta.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(schema, router, partitioner, config) ⇒ Output
constructor
A new instance of Output.
- #make_event_message(e) ⇒ Object
- #make_metric_message(m) ⇒ Object
- #produce(events, metrics) ⇒ Object
- #setup ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(schema, router, partitioner, config) ⇒ Output
Returns a new instance of Output.
41 42 43 44 45 46 47 48 49 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 41 def initialize schema, router, partitioner, config @schema = schema @router = router @partitioner = partitioner @producer = config[:producer] @brokers = config[:brokers] @reporter_meta = {:producer_type => "kafka", :producer => @producer} @instance = nil end |
Instance Attribute Details
#reporter_meta ⇒ Object (readonly)
Returns the value of attribute reporter_meta.
28 29 30 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 28 def @reporter_meta end |
Class Method Details
.prepare(config) ⇒ Object
35 36 37 38 39 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 35 def self.prepare config config[:producer] ||= DEFAULT_PRODUCER config[:brokers] ||= DEFAULT_BROKERS config end |
Instance Method Details
#make_event_message(e) ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 95 def e topic = @router.route_event e return nil if topic.nil? data = @schema.dump_event e key = @partitioner.partition e MessageToSend.new topic, data, key end |
#make_metric_message(m) ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 103 def m topic = @router.route_metric m return nil if topic.nil? data = @schema.dump_metric m key = @partitioner.partition m MessageToSend.new topic, data, key end |
#produce(events, metrics) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 67 def produce events, metrics unless @instance return nil end = events.size + metrics.size = [] events.each do |e| = e next if .nil? << end metrics.each do |e| = e next if .nil? << end if .size < increment :kafka_routing_error, - .size end increment :kafka_routing_success, .size @instance. end |
#setup ⇒ Object
51 52 53 54 55 56 57 58 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 51 def setup if not @brokers or @brokers.empty? log.error "No usable initial list of brokers" return end @instance = Producer.new @brokers, @producer end |
#teardown ⇒ Object
60 61 62 63 64 65 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 60 def teardown if @instance @instance.stop @instance = nil end end |