Class: CloudPI::Appender

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudpi-appender.rb

Constant Summary collapse

CONNECT_TIMEOUT =

sec

0.1

Instance Method Summary collapse

Constructor Details

#initialize(bridge_ip = 'localhost', bridge_port = 5999, logger = nil, policy = {}) ⇒ Appender

Returns a new instance of Appender.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/cloudpi-appender.rb', line 13

def initialize(bridge_ip = 'localhost', bridge_port = 5999, logger = nil, policy = {})
  @bridge_ip     = bridge_ip
  @bridge_port   = bridge_port

  @logger = logger
  @logger = Logger.new(STDOUT) unless @logger
  
  begin
    json         = JSON.parse(ENV['VCAP_APPLICATION'], {:symbolize_names => true})
    @app_ip      = json[:host]
    @app_port    = json[:port]
    @app_id      = json[:app_id]
    @instance_id = json[:instance_id]
  rescue JSON::ParseError, TypeError
    @logger.error "parsing error: ENV['VCAP_APPLICATION']"
  end

  @policy = policy
  @policy = {} unless @policy.is_a? Hash

  connect_to_bridge
end

Instance Method Details

#check_and_reconnect_to_bridgeObject



56
57
58
# File 'lib/cloudpi-appender.rb', line 56

def check_and_reconnect_to_bridge
  connect_to_bridge unless connected_to_bridge?
end

#connect_to_bridgeObject



36
37
38
39
40
41
42
43
44
# File 'lib/cloudpi-appender.rb', line 36

def connect_to_bridge
  Timeout::timeout(CONNECT_TIMEOUT) do
    @bridge = TCPSocket.new(@bridge_ip, @bridge_port)
  end
  send_policy
  @logger.info("connected to bridge: #{@bridge_ip}:#{@bridge_port}")
rescue => e
  @logger.error("bridge connection failed: #{e.message}")
end

#connected_to_bridge?Boolean

Returns:

  • (Boolean)


51
52
53
54
# File 'lib/cloudpi-appender.rb', line 51

def connected_to_bridge?
  return false unless @bridge && !@bridge.closed?
  true
end

#disconnect_bridgeObject



46
47
48
49
# File 'lib/cloudpi-appender.rb', line 46

def disconnect_bridge
  @bridge.close if @bridge
  @logger.info("bridge connection closed.")
end

#metric_keyObject



79
80
81
82
# File 'lib/cloudpi-appender.rb', line 79

def metric_key
  # instance_ip:instance_port:app_id:instance_id
  :"#{@app_ip}:#{@app_port}:#{@app_id}:#{@instance_id}"
end

#msg_with_when(msg) ⇒ Object



97
98
99
# File 'lib/cloudpi-appender.rb', line 97

def msg_with_when(msg)
  msg.merge({ :when => Time.now.to_i })
end

#parse_json(json) ⇒ Object



60
61
62
63
64
65
66
67
# File 'lib/cloudpi-appender.rb', line 60

def parse_json(json)
  body = JSON.parse(json, {:symbolize_names => true})
  raise JSON::ParseError unless body.is_a?(Hash)  # not allowed array json
  body
rescue JSON::ParseError
  @logger.error("wrong metric json message recieved.")
  {}
end

#send(msg) ⇒ Object



107
108
109
110
111
112
# File 'lib/cloudpi-appender.rb', line 107

def send(msg)
  return unless msg.is_a? Hash
  msg = { metric_key => msg_with_when(msg) }.to_json
  send_to_bridge(msg)
  @logger.info("value sent: #{msg}")
end

#send_heartbeatObject



91
92
93
94
95
# File 'lib/cloudpi-appender.rb', line 91

def send_heartbeat
  msg = { metric_key => {:alive => Time.now.to_i} }.to_json
  send_to_bridge(msg)
  @logger.info("heartbeat sent: #{msg}")
end

#send_policyObject



101
102
103
104
105
# File 'lib/cloudpi-appender.rb', line 101

def send_policy
  msg = { metric_key => msg_with_when(@policy) }.to_json
  send_to_bridge(msg)
  @logger.info("policy sent: #{msg}")
end

#send_to_bridge(msg) ⇒ Object



69
70
71
72
73
74
75
76
77
# File 'lib/cloudpi-appender.rb', line 69

def send_to_bridge(msg)
  return unless msg
  check_and_reconnect_to_bridge
  @bridge.write("#{msg}\0")
  @logger.debug("sent message to bridge: #{msg}")
rescue => e
  @logger.error("bridge write error: #{e.message}")
  @bridge.close
end

#set_policy(policy) ⇒ Object



84
85
86
87
88
89
# File 'lib/cloudpi-appender.rb', line 84

def set_policy(policy)
  @policy = policy
  @policy = {} unless @policy.is_a? Hash
  @logger.info("set metric policy: #{@policy.to_json}")
  send_policy
end