fluent-plugin-zmq-pub
Overview
Fluentd plugin to publish records to ZeroMQ.
Why this plugin was created?
Sometimes I wanted to ‘sniff’ fluentd stream – running my own programs to the stream without changing fluentd configuration and restarting fluentd. With this plugin, fluentd records are always published to ZeroMQ regardless of the existance of subscriber. After that I can start and stop my subcriber programs at any time.
Configuration
<match zmq.**>
type zmq_pub
pubkey ${tag}:${key1}
bindaddr tcp://*:5556
flush_interval 1s
</match>
-
‘pubkey’ specifies the publish key to ZeroMQ.
-
‘$tag’ is replace by fluentd tag. ‘$name’ is replaced by fluentd record.
-
Actual record to be published is ‘<pubkey> <reocord.to_msgpack>’.
-
Subscriber can subscribe by ‘<pubkey>’.
-
-
‘bindaddr’ is the address to which ZeroMQ publisher socket to be bound.
Example usage
Put the configuration above to fluentd.conf, and save this sample code as ‘sample_sub.rb’.
#!/usr/bin/env ruby
require 'zmq'
require 'msgpack'
context = ZMQ::Context.new(1)
subscriber = context.socket(ZMQ::SUB)
subscriber.connect("tcp://localhost:5556")
if ARGV.length > 0
ARGV.each{|s|
subscriber.setsockopt(ZMQ::SUBSCRIBE,s)
}
else
subscriber.setsockopt(ZMQ::SUBSCRIBE,"")
end
while true
if msg = subscriber.recv(ZMQ::NOBLOCK)
(tag, time, record) = MessagePack.unpack(msg.split(" ",2)[1])
puts "tag: #{tag}"
puts "time: #{time}"
puts "record: #{record.to_s}"
end
sleep(0.1)
end
Run sample_sub.rb. Argument is the key to subscribe. (Correspond to ‘pubkey’ in zmq_pub configuration). If you give no arguments, all key will be subscribed.
% ./sample_sub.rb zmq.test.tag:aaa
Submit records to fluentd.
% echo '{"key1": "aaa", "key2":"foo"}' | fluent-cat zmq.test.tag
% echo '{"key1": "bbb", "key2":"foo"}' | fluent-cat zmq.test.tag
Then you will get the following output from sample_sub.rb
tag: zmq.test.tag
time: 1376033265
record: {"key1"=>"aaa", "key2"=>"foo"}
(You should not get the second record(“key1”:“bbb”) because the publish key to zmq was “zmq.test.tag:bbb” and specified subscibe key was “zmq.test.tag:aaa”)
The nice thing is that once you put this plugin to your fluentd.conf and start fluentd, you can start and stop any subscriber programs without changing fluentd configuration.
Copyright
-
Copyright © 2013- OGIBAYASHI Hironori (@angostura11)
-
License
-
Apache License, Version 2.0
-