Class: RightScale::AgentWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/instance/agent_watcher.rb

Defined Under Namespace

Classes: AlreadyWatched, UnknownAgent

Constant Summary collapse

SECOND =

Some Time calculation constants

1
MINUTE =
60 * SECOND
HOUR =
60 * MINUTE
DAY =
24 * HOUR
DEFAULT_FREQUENCY_CHECK =

5 * SECOND

Instance Method Summary collapse

Constructor Details

#initialize(logger, pid_dir = nil) ⇒ AgentWatcher

Returns a new instance of AgentWatcher.



51
52
53
54
55
56
57
58
# File 'lib/instance/agent_watcher.rb', line 51

def initialize(logger, pid_dir=nil)
  @logger = logger
  @pid_dir = pid_dir
  @running = false
  @stopped_list = {}
  @watched_list = {}
  @watch_list_lock = Monitor.new
end

Instance Method Details

#kill_agent(identity, signal = 'SIGKILL') ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/instance/agent_watcher.rb', line 64

def kill_agent(identity, signal='SIGKILL')
  @watch_list_lock.synchronize do
    raise UnknownAgent("#{identity} is not a known agent.") unless @watched_list.has_key?(identity)
    agent = @watched_list.delete(identity)
    Process.kill(signal, agent[:pid].read_pid[:pid])
    @stopped_list[identity] = agent
  end
end

#log_info(s) ⇒ Object



60
61
62
# File 'lib/instance/agent_watcher.rb', line 60

def log_info(s)
  @logger.call("AgentWatcher: #{s}")
end

#restart_agent(identity) ⇒ Object



73
74
75
76
# File 'lib/instance/agent_watcher.rb', line 73

def restart_agent(identity)
  stop_agent(identity)
  start_agent(identity)
end

#start_agent(identity) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/instance/agent_watcher.rb', line 138

def start_agent(identity)
  @watch_list_lock.synchronize {
    raise UnknownAgent("#{identity} is not a known stopped agent.") unless @stopped_list.has_key?(identity)
    agent = @stopped_list.delete(identity)
    agent[:pid].remove
    system("#{agent[:exec]} #{agent[:start_opts]}")
    log_info("Successfully started the #{identity} agent.")
    # Give us some time to come up before the next check...should be good in
    # 60 seconds I would think.
    agent[:next_check] = MINUTE
    @watched_list[identity] = agent
  }
end

#start_watchingObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/instance/agent_watcher.rb', line 78

def start_watching()
  return if @running
  # This logic is implemented with a priority queue of "next check" times:
  #
  # This allows us not to have to start a bunch of timers and do time
  # alrithmatic which can be tricky when someone goes and changes the date
  # on the system this code is running.

  # Initialize all agents
  @watch_list_lock.synchronize {
    @watched_list.each { |k, v| v[:next_check] = 0 }
  }

  log_info("Starting the AgentWatcher.")
  @agent_watcher_thread = Thread.new do
    @running = true
    while @running
      next_check = 0
      time_start = Time.now

      # TODO: This may need to be refactored into a real priority queue if we
      # start to have large amounts of agents assigned, rather than incur the
      # overhead of discovering the next smallest number every iteration. -brs
      @watch_list_lock.synchronize do
        # No use doing anything till we have something to work on, I would have
        # rather used a ConditionVariable here, but they are not compatible with
        # Monitor objects, and I need reentrance.
        sleep DEFAULT_FREQUENCY_CHECK until @watched_list.size > 0 or not @running

        # Check all processes in the list with a next check time of zero
        # replacing the next check time with their frequency.
        @watched_list.each do |k,v|
          if v[:next_check] <= 0
            v[:next_check] = v[:freq]
            check_agent(k)
          end
        end

        # Find the lowest next check
        @watched_list.each do |k,v|
          next_check = v[:next_check] unless (next_check > 0 and next_check < v[:next_check])
        end

        # Subtract this from all the elements before we sleep
        @watched_list.each do |k,v| 
          v[:next_check] -= next_check
        end
      end

      # Account for the time it took to check agents and find the next
      # check time.
      next_check -= (Time.now - time_start).to_i

      # Sleep for the next check time
      next_check -= sleep(next_check) while (next_check > 0 and @running)
    end
    log_info("Shutting down.")
  end
end

#stop_agent(identity) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
# File 'lib/instance/agent_watcher.rb', line 162

def stop_agent(identity)
  @watch_list_lock.synchronize {
    raise UnknownAgent("#{identity} is not a known agent.") unless @watched_list.has_key?(identity)
    agent = @watched_list.delete(identity)
    if system("#{agent[:exec]} #{agent[:stop_opts]}")
      log_info("Successfully stopped the #{identity} agent.")
      agent[:pid].remove
    end
    @stopped_list[identity] = agent
  }
end

#stop_watchingObject



152
153
154
155
156
157
158
159
160
# File 'lib/instance/agent_watcher.rb', line 152

def stop_watching()
  return unless @running
  log_info("Stopping the AgentWatcher.")
  @running = false
  @agent_watcher_thread.terminate
  @agent_watcher_thread.join
  @agent_watcher_thread = nil
  log_info("AgentWatcher Stopped.")
end

#watch_agent(identity, exec, start_opts, stop_opts, freq = DEFAULT_FREQUENCY_CHECK) ⇒ Object

Raises:

  • (BadFrequency)


174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/instance/agent_watcher.rb', line 174

def watch_agent(identity, exec, start_opts, stop_opts, freq=DEFAULT_FREQUENCY_CHECK)
  # Make things simple for now and require a resolution of 1 second
  # for the frequency check.
  raise BadFrequency.new unless (freq > 1)

  # Protect the watch list from the thread monitoring the agents
  @watch_list_lock.synchronize {
    unless @watched_list.has_key?(identity)

      # If we were given a block, use that for state change, otherwise restart
      action = (block_given? && Proc.new) || Proc.new do |identity, state, mesg|
        if state == :stopped
          log_info("#{identity} has stopped, restarting now.")
          self.start_agent(identity)
        end
      end

      @watched_list[identity] = {
        :action=>action,
        :exec=>exec,
        :start_opts=>start_opts,
        :stop_opts=>stop_opts,
        :freq=>freq,
        :pid=>PidFile.new(identity,@pid_dir),
      }
    else
      raise AlreadyWatched.new("The agent [#{identity}] is already being watched by us.")
    end
  }
end