Class: CloudPI::Appender
- Inherits:
-
Object
- Object
- CloudPI::Appender
- Defined in:
- lib/cloudpi-appender.rb
Constant Summary collapse
- CONNECT_TIMEOUT =
sec
0.1
Instance Method Summary collapse
- #check_and_reconnect_to_bridge ⇒ Object
- #connect_to_bridge ⇒ Object
- #connected_to_bridge? ⇒ Boolean
- #disconnect_bridge ⇒ Object
-
#initialize(bridge_ip = 'localhost', bridge_port = 5999, logger = nil, policy = {}) ⇒ Appender
constructor
A new instance of Appender.
- #metric_key ⇒ Object
- #msg_with_when(msg) ⇒ Object
- #parse_json(json) ⇒ Object
- #send(msg) ⇒ Object
- #send_heartbeat ⇒ Object
- #send_policy ⇒ Object
- #send_to_bridge(msg) ⇒ Object
- #set_policy(policy) ⇒ Object
Constructor Details
#initialize(bridge_ip = 'localhost', bridge_port = 5999, logger = nil, policy = {}) ⇒ Appender
Returns a new instance of Appender.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/cloudpi-appender.rb', line 13 def initialize(bridge_ip = 'localhost', bridge_port = 5999, logger = nil, policy = {}) @bridge_ip = bridge_ip @bridge_port = bridge_port @logger = logger @logger = Logger.new(STDOUT) unless @logger begin json = JSON.parse(ENV['VCAP_APPLICATION'], {:symbolize_names => true}) @app_ip = json[:host] @app_port = json[:port] @app_id = json[:app_id] @instance_id = json[:instance_id] rescue JSON::ParseError, TypeError @logger.error "parsing error: ENV['VCAP_APPLICATION']" end @policy = policy @policy = {} unless @policy.is_a? Hash connect_to_bridge end |
Instance Method Details
#check_and_reconnect_to_bridge ⇒ Object
56 57 58 |
# File 'lib/cloudpi-appender.rb', line 56 def check_and_reconnect_to_bridge connect_to_bridge unless connected_to_bridge? end |
#connect_to_bridge ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/cloudpi-appender.rb', line 36 def connect_to_bridge Timeout::timeout(CONNECT_TIMEOUT) do @bridge = TCPSocket.new(@bridge_ip, @bridge_port) end send_policy @logger.info("connected to bridge: #{@bridge_ip}:#{@bridge_port}") rescue => e @logger.error("bridge connection failed: #{e.}") end |
#connected_to_bridge? ⇒ Boolean
51 52 53 54 |
# File 'lib/cloudpi-appender.rb', line 51 def connected_to_bridge? return false unless @bridge && !@bridge.closed? true end |
#disconnect_bridge ⇒ Object
46 47 48 49 |
# File 'lib/cloudpi-appender.rb', line 46 def disconnect_bridge @bridge.close if @bridge @logger.info("bridge connection closed.") end |
#metric_key ⇒ Object
79 80 81 82 |
# File 'lib/cloudpi-appender.rb', line 79 def metric_key # instance_ip:instance_port:app_id:instance_id :"#{@app_ip}:#{@app_port}:#{@app_id}:#{@instance_id}" end |
#msg_with_when(msg) ⇒ Object
97 98 99 |
# File 'lib/cloudpi-appender.rb', line 97 def msg_with_when(msg) msg.merge({ :when => Time.now.to_i }) end |
#parse_json(json) ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/cloudpi-appender.rb', line 60 def parse_json(json) body = JSON.parse(json, {:symbolize_names => true}) raise JSON::ParseError unless body.is_a?(Hash) # not allowed array json body rescue JSON::ParseError @logger.error("wrong metric json message recieved.") {} end |
#send(msg) ⇒ Object
107 108 109 110 111 112 |
# File 'lib/cloudpi-appender.rb', line 107 def send(msg) return unless msg.is_a? Hash msg = { metric_key => msg_with_when(msg) }.to_json send_to_bridge(msg) @logger.info("value sent: #{msg}") end |
#send_heartbeat ⇒ Object
91 92 93 94 95 |
# File 'lib/cloudpi-appender.rb', line 91 def send_heartbeat msg = { metric_key => {:alive => Time.now.to_i} }.to_json send_to_bridge(msg) @logger.info("heartbeat sent: #{msg}") end |
#send_policy ⇒ Object
101 102 103 104 105 |
# File 'lib/cloudpi-appender.rb', line 101 def send_policy msg = { metric_key => msg_with_when(@policy) }.to_json send_to_bridge(msg) @logger.info("policy sent: #{msg}") end |
#send_to_bridge(msg) ⇒ Object
69 70 71 72 73 74 75 76 77 |
# File 'lib/cloudpi-appender.rb', line 69 def send_to_bridge(msg) return unless msg check_and_reconnect_to_bridge @bridge.write("#{msg}\0") @logger.debug("sent message to bridge: #{msg}") rescue => e @logger.error("bridge write error: #{e.}") @bridge.close end |
#set_policy(policy) ⇒ Object
84 85 86 87 88 89 |
# File 'lib/cloudpi-appender.rb', line 84 def set_policy(policy) @policy = policy @policy = {} unless @policy.is_a? Hash @logger.info("set metric policy: #{@policy.to_json}") send_policy end |