Class: LogjamAgent::ZMQForwarder
- Inherits:
-
Object
- Object
- LogjamAgent::ZMQForwarder
- Includes:
- Util
- Defined in:
- lib/logjam_agent/zmq_forwarder.rb
Constant Summary collapse
- @@mutex =
Mutex.new
- @@zmq_context =
nil
Constants included from Util
Util::BIG_ENDIAN, Util::FIXNUM_MAX, Util::META_INFO_DEVICE_NUMBER, Util::META_INFO_TAG, Util::META_INFO_VERSION, Util::UINT64
Instance Attribute Summary collapse
-
#app ⇒ Object
readonly
Returns the value of attribute app.
-
#connection_specs ⇒ Object
readonly
Returns the value of attribute connection_specs.
-
#env ⇒ Object
readonly
Returns the value of attribute env.
Class Method Summary collapse
Instance Method Summary collapse
- #default_options ⇒ Object
- #forward(data, options = {}) ⇒ Object
-
#initialize(*args) ⇒ ZMQForwarder
constructor
A new instance of ZMQForwarder.
- #publish(app_env, key, data) ⇒ Object
- #reset ⇒ Object
- #socket ⇒ Object
Methods included from Util
#augment_connection_spec, #next_fixnum, #pack_info, #pack_uint64_big_endian, #unpack_info, #unpack_uint64_big_endian, #zclock_time
Constructor Details
#initialize(*args) ⇒ ZMQForwarder
Returns a new instance of ZMQForwarder.
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 7 def initialize(*args) opts = args. @app = args[0] || LogjamAgent.application_name @env = args[1] || LogjamAgent.environment_name @app_env = "#{@app}-#{@env}" @config = .merge!(opts) @config[:host] = "localhost" if @config[:host].blank? @connection_specs = @config[:host].split(',').map do |host| augment_connection_spec(host, @config[:port]) end @sequence = 0 end |
Instance Attribute Details
#app ⇒ Object (readonly)
Returns the value of attribute app.
3 4 5 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 3 def app @app end |
#connection_specs ⇒ Object (readonly)
Returns the value of attribute connection_specs.
3 4 5 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 3 def connection_specs @connection_specs end |
#env ⇒ Object (readonly)
Returns the value of attribute env.
3 4 5 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 3 def env @env end |
Class Method Details
.context ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 31 def self.context @@mutex.synchronize do @@zmq_context ||= begin require 'ffi-rzmq' context = ZMQ::Context.new(1) at_exit { context.terminate } context end end end |
Instance Method Details
#default_options ⇒ Object
20 21 22 23 24 25 26 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 20 def { :port => 9605, :linger => 1000, :snd_hwm => 100, } end |
#forward(data, options = {}) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 61 def forward(data, ={}) app_env = [:app_env] || @app_env key = [:routing_key] || "logs.#{app_env.sub('-','.')}" if engine = [:engine] key += ".#{engine}" end msg = LogjamAgent.encode_payload(data) publish(app_env, key, msg) rescue => error reraise_expectation_errors! raise ForwardingError.new(error.) end |
#publish(app_env, key, data) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 74 def publish(app_env, key, data) info = pack_info(@sequence = next_fixnum(@sequence)) parts = [app_env, key, data, info] if socket.send_strings(parts, ZMQ::DONTWAIT) < 0 raise "ZMQ error: #{ZMQ::Util.error_string}" end end |
#reset ⇒ Object
55 56 57 58 59 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 55 def reset return unless @socket @socket.close @socket = nil end |
#socket ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/logjam_agent/zmq_forwarder.rb', line 43 def socket return @socket if @socket @socket = self.class.context.socket(ZMQ::PUSH) at_exit { reset } @socket.setsockopt(ZMQ::LINGER, @config[:linger]) @socket.setsockopt(ZMQ::SNDHWM, @config[:snd_hwm]) @connection_specs.each do |spec| @socket.connect(spec) end @socket end |