Class: LogStash::Inputs::ZeroMQ

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/zeromq.rb

Overview

Read events over a 0MQ SUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this input plugin.

The default settings will create a subscriber binding to ‘tcp://127.0.0.1:2120` waiting for connecting publishers.

Instance Method Summary collapse

Instance Method Details

#closeObject

def register



116
117
118
119
120
121
122
123
# File 'lib/logstash/inputs/zeromq.rb', line 116

def close
  begin
    error_check(@zsocket.close, "while closing the zmq socket")
    context.terminate
  rescue RuntimeError => e
    @logger.error("Failed to properly teardown ZeroMQ")
  end
end

#registerObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/logstash/inputs/zeromq.rb', line 75

def register
  require "ffi-rzmq"
  require "logstash/util/zeromq"
  self.class.send(:include, LogStash::Util::ZeroMQ)

  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR 
  when "pushpull"
    zmq_const = ZMQ::PULL
  when "pubsub"
    zmq_const = ZMQ::SUB
  end # case socket_type
  @zsocket = context.socket(zmq_const)
  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end

  if @topology == "pubsub"
    if @topic.nil?
      @logger.debug("ZMQ - No topic provided. Subscribing to all messages")
      error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""),
    "while setting ZMQ::SUBSCRIBE")
    else
      @topic.each do |t|
        @logger.debug("ZMQ subscribing to topic: #{t}")
        error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t),
      "while setting ZMQ::SUBSCRIBE == #{t}")
      end
    end
  end

end

#run(output_queue) ⇒ Object

def server?



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/logstash/inputs/zeromq.rb', line 129

def run(output_queue)
  host = Socket.gethostname
  begin
    while !stop?
      # Here's the unified receiver
      # Get the first part as the msg
      m1 = ""
      rc = @zsocket.recv_string(m1, ZMQ::DONTWAIT)
      next if rc == -1 && ZMQ::Util.errno == ZMQ::EAGAIN
      error_check(rc, "in recv_string")

      @logger.debug("ZMQ receiving", :event => m1)
      msg = m1
      # If we have more parts, we'll eat the first as the topic
      # and set the message to the second part
      if @zsocket.more_parts?
        @logger.debug("Multipart message detected. Setting @message to second part. First part was: #{m1}")
        m2 = ''
        rc2 = @zsocket.recv_string(m2)
        error_check(rc2, "in recv_string")
        @logger.debug("ZMQ receiving", :event => m2)
        msg = m2
      end
      @codec.decode(msg) do |event|
        event["host"] ||= host
        decorate(event)
        output_queue << event
      end
    end
  rescue => e
    @logger.debug("ZMQ Error", :subscriber => @zsocket,
                  :exception => e)
    retry
  end # begin
end

#server?Boolean

def close

Returns:

  • (Boolean)


125
126
127
# File 'lib/logstash/inputs/zeromq.rb', line 125

def server?
  @mode == "server"
end