Module: LogStash::Util::ZeroMQ
- Defined in:
- lib/logstash/util/zeromq.rb
Constant Summary collapse
- STRING_OPTS =
%w{IDENTITY SUBSCRIBE UNSUBSCRIBE}
Instance Method Summary collapse
- #context ⇒ Object
- #error_check(rc, doing) ⇒ Object
-
#setopts(socket, options) ⇒ Object
def error_check.
- #setup(socket, address) ⇒ Object
Instance Method Details
#context ⇒ Object
10 11 12 |
# File 'lib/logstash/util/zeromq.rb', line 10 def context @context ||= ZMQ::Context.new end |
#error_check(rc, doing) ⇒ Object
23 24 25 26 27 28 |
# File 'lib/logstash/util/zeromq.rb', line 23 def error_check(rc, doing) unless ZMQ::Util.resultcode_ok?(rc) @logger.error("ZeroMQ error while #{doing}", { :error_code => rc }) raise "ZeroMQ Error while #{doing}" end end |
#setopts(socket, options) ⇒ Object
def error_check
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/logstash/util/zeromq.rb', line 30 def setopts(socket, ) .each do |opt,value| sockopt = opt.split('::')[1] option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) unless STRING_OPTS.include?(sockopt) begin Float(value) value = value.to_i rescue ArgumentError raise "#{sockopt} requires a numeric value. #{value} is not numeric" end end # end unless error_check(socket.setsockopt(option, value), "while setting #{opt} == #{value}") end # end each end |
#setup(socket, address) ⇒ Object
14 15 16 17 18 19 20 21 |
# File 'lib/logstash/util/zeromq.rb', line 14 def setup(socket, address) if server? error_check(socket.bind(address), "binding to #{address}") else error_check(socket.connect(address), "connecting to #{address}") end @logger.info("0mq: #{server? ? 'connected' : 'bound'}", :address => address) end |