Class: LogStash::Outputs::ZeroMQ

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/zeromq.rb

Overview

Write events to a 0MQ PUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this output plugin.

The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120

Instance Method Summary collapse

Instance Method Details

#closeObject



99
100
101
# File 'lib/logstash/outputs/zeromq.rb', line 99

def close
  error_check(@zsocket.close, "while closing the socket")
end

#publish(event, payload) ⇒ Object

def receive



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/logstash/outputs/zeromq.rb', line 115

def publish(event, payload)
  @logger.debug? && @logger.debug("0mq: sending", :event => payload)
  if @topology == "pubsub"
    # TODO(sissel): Need to figure out how to fit this into the codecs system.
    #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}")
    #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE),
                #"in topic send_string")
  end
  error_check(@zsocket.send_string(payload), "in send_string")
rescue => e
  @logger.warn("0mq output exception", :address => @address, :exception => e)
end

#receive(event) ⇒ Object



109
110
111
112
113
# File 'lib/logstash/outputs/zeromq.rb', line 109

def receive(event)
  

  @codec.encode(event)
end

#registerObject



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/logstash/outputs/zeromq.rb', line 63

def register
  require "ffi-rzmq"
  require "logstash/util/zeromq"
  self.class.send(:include, LogStash::Util::ZeroMQ)

  if @mode == "server"
    workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!")
  end

  # Translate topology shorthand to socket types
  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR
  when "pushpull"
    zmq_const = ZMQ::PUSH
  when "pubsub"
    zmq_const = ZMQ::PUB
  end # case socket_type

  @zsocket = context.socket(zmq_const)

  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end

  @codec.on_event(&method(:publish))
end