Class: PushyClient::Heartbeater

Inherits:
Object
  • Object
show all
Defined in:
lib/pushy_client/heartbeater.rb

Constant Summary collapse

NUM_HEARTBEATS_TO_LOG =
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Heartbeater

Returns a new instance of Heartbeater.



22
23
24
25
26
27
# File 'lib/pushy_client/heartbeater.rb', line 22

def initialize(client)
  @client = client
  @online_mutex = Mutex.new
  @heartbeat_sequence = 1
  @on_server_availability_change = []
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



29
30
31
# File 'lib/pushy_client/heartbeater.rb', line 29

def client
  @client
end

#incarnation_idObject (readonly)

Returns the value of attribute incarnation_id.



30
31
32
# File 'lib/pushy_client/heartbeater.rb', line 30

def incarnation_id
  @incarnation_id
end

#intervalObject (readonly)

Returns the value of attribute interval.



33
34
35
# File 'lib/pushy_client/heartbeater.rb', line 33

def interval
  @interval
end

#offline_thresholdObject (readonly)

Returns the value of attribute offline_threshold.



32
33
34
# File 'lib/pushy_client/heartbeater.rb', line 32

def offline_threshold
  @offline_threshold
end

#online_thresholdObject (readonly)

Returns the value of attribute online_threshold.



31
32
33
# File 'lib/pushy_client/heartbeater.rb', line 31

def online_threshold
  @online_threshold
end

Instance Method Details

#heartbeat_received(incarnation_id, sequence) ⇒ Object

TODO use the sequence for something?



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/pushy_client/heartbeater.rb', line 108

def heartbeat_received(incarnation_id, sequence)
  message = "[#{node_name}] Received server heartbeat (sequence ##{sequence})"
  if @online_counter <= NUM_HEARTBEATS_TO_LOG
    Chef::Log.info message + " logging #{@online_counter}/#{NUM_HEARTBEATS_TO_LOG}"
  else
    Chef::Log.debug message
  end
  # If the incarnation id has changed, we need to reconfigure.
  if @incarnation_id != incarnation_id
    if @incarnation_id.nil?
      @incarnation_id = incarnation_id
      Chef::Log.info "[#{node_name}] First heartbeat received.  Server is at incarnation ID #{incarnation_id}."
    else
      # We need to set incarnation id before we reconfigure; this thread will
      # be killed by the reconfigure :)
      splay = Random.new.rand(interval.to_f)
      Chef::Log.info "[#{node_name}] Server restart detected (incarnation ID changed from #{@incarnation_id} to #{incarnation_id}).  Reconfiguring after a randomly chosen #{splay} second delay to avoid storming the server ..."
      @incarnation_id = incarnation_id
      sleep(splay)
      client.trigger_reconfigure
    end
  end

  @online_mutex.synchronize do
    @offline_counter = 0

    if !@online && @online_counter > online_threshold
      Chef::Log.info "[#{node_name}] Server has heartbeated #{@online_counter} times without missing more than #{offline_threshold} heartbeats in a row."
      set_online(true)
    else
      @online_counter += 1
    end
  end
end

#node_nameObject



35
36
37
# File 'lib/pushy_client/heartbeater.rb', line 35

def node_name
  client.node_name
end

#on_server_availability_change(&block) ⇒ Object



43
44
45
# File 'lib/pushy_client/heartbeater.rb', line 43

def on_server_availability_change(&block)
  @on_server_availability_change << block
end

#online?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/pushy_client/heartbeater.rb', line 39

def online?
  @online
end

#reconfigureObject



102
103
104
105
# File 'lib/pushy_client/heartbeater.rb', line 102

def reconfigure
  stop
  start # Start picks up new configuration
end

#startObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/pushy_client/heartbeater.rb', line 47

def start
  @incarnation_id = client.config['incarnation_id']
  @online_threshold = client.config['push_jobs']['heartbeat']['online_threshold']
  @offline_threshold = client.config['push_jobs']['heartbeat']['offline_threshold']
  @interval = client.config['push_jobs']['heartbeat']['interval']

  @online_counter = 0
  @offline_counter = 0
  # We optimistically declare the server online since we just got a config blob via http from it
  # however, if the server is reachable via http but not zmq we'll go down after a few heartbeats.
  set_online(true)

  @heartbeat_thread = Thread.new do
    Chef::Log.info "[#{node_name}] Starting heartbeat / offline detection thread on interval #{interval} ..."

    while true
      begin
        # When the server goes more than <offline_threshold> intervals
        # without sending us a heartbeat, treat it as offline
        @online_mutex.synchronize do
          if @online
            if @offline_counter > offline_threshold
              Chef::Log.info "[#{node_name}] Server has missed #{@offline_counter} heartbeats in a row.  Considering it offline, and stopping heartbeat."
              set_online(false)
              @online_counter = 0
            else
              @offline_counter += 1
            end
          end
        end

        # We only send heartbeats to online servers
        if @online
          client.send_heartbeat(@heartbeat_sequence)
          if @heartbeat_sequence <= NUM_HEARTBEATS_TO_LOG
            Chef::Log.info "[#{node_name}] Sending heartbeat #{@heartbeat_sequence} (logging first #{NUM_HEARTBEATS_TO_LOG})"
          else
            Chef::Log.debug "[#{node_name}] Sending heartbeat #{@heartbeat_sequence}"
          end
          @heartbeat_sequence += 1
        end
        sleep(interval)
      rescue
        client.log_exception("Error in heartbeat / offline detection thread", $!)
      end
    end
  end
end

#stopObject



96
97
98
99
100
# File 'lib/pushy_client/heartbeater.rb', line 96

def stop
  Chef::Log.info "[#{node_name}] Stopping heartbeat / offline detection thread ..."
  @heartbeat_thread.kill
  @heartbeat_thread.join
end