Class: LogStash::Outputs::ZeroMQ
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::ZeroMQ
- Defined in:
- lib/logstash/outputs/zeromq.rb
Overview
Write events to a 0MQ PUB socket.
You need to have the 0mq 2.1.x library installed to be able to use this output plugin.
The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120
Instance Method Summary collapse
- #close ⇒ Object
-
#publish(event, payload) ⇒ Object
def receive.
- #receive(event) ⇒ Object
- #register ⇒ Object
Instance Method Details
#close ⇒ Object
99 100 101 |
# File 'lib/logstash/outputs/zeromq.rb', line 99 def close error_check(@zsocket.close, "while closing the socket") end |
#publish(event, payload) ⇒ Object
def receive
115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/logstash/outputs/zeromq.rb', line 115 def publish(event, payload) @logger.debug? && @logger.debug("0mq: sending", :event => payload) if @topology == "pubsub" # TODO(sissel): Need to figure out how to fit this into the codecs system. #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}") #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE), #"in topic send_string") end error_check(@zsocket.send_string(payload), "in send_string") rescue => e @logger.warn("0mq output exception", :address => @address, :exception => e) end |
#receive(event) ⇒ Object
109 110 111 112 113 |
# File 'lib/logstash/outputs/zeromq.rb', line 109 def receive(event) @codec.encode(event) end |
#register ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/logstash/outputs/zeromq.rb', line 63 def register require "ffi-rzmq" require "logstash/util/zeromq" self.class.send(:include, LogStash::Util::ZeroMQ) if @mode == "server" workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!") end # Translate topology shorthand to socket types case @topology when "pair" zmq_const = ZMQ::PAIR when "pushpull" zmq_const = ZMQ::PUSH when "pubsub" zmq_const = ZMQ::PUB end # case socket_type @zsocket = context.socket(zmq_const) error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") if @sockopt setopts(@zsocket, @sockopt) end @address.each do |addr| setup(@zsocket, addr) end @codec.on_event(&method(:publish)) end |