Class: CloudCrowd::Node
- Inherits:
-
Sinatra::Base
- Object
- Sinatra::Base
- CloudCrowd::Node
- Defined in:
- lib/cloud_crowd/node.rb
Overview
A Node is a Sinatra/Thin application that runs a single instance per-machine It registers with the central server, receives WorkUnits, and forks off Workers to process them. The actions are:
- get /heartbeat
-
Returns 200 OK to let monitoring tools know the server’s up.
- post /work
-
The central server hits
/work
to dispatch a WorkUnit to this Node.
Constant Summary collapse
- DEFAULT_PORT =
A Node’s default port. You only run a single node per machine, so they can all use the same port without any problems.
9063
- SCRAPE_UPTIME =
A list of regex scrapers, which let us extract the one-minute load average and the amount of free memory on different flavors of UNIX.
/\d+\.\d+/
- SCRAPE_LINUX_MEMORY =
/MemFree:\s+(\d+) kB/
- SCRAPE_MAC_MEMORY =
/Pages free:\s+(\d+)./
- SCRAPE_MAC_PAGE =
/page size of (\d+) bytes/
- MONITOR_INTERVAL =
The interval at which the node monitors the machine’s load and memory use (if configured to do so in config.yml).
3
- CHECK_IN_INTERVAL =
The interval at which the node regularly checks in with central (5 min).
300
- OVERLOADED_MESSAGE =
The response sent back when this node is overloaded.
'Node Overloaded'
Instance Attribute Summary collapse
-
#central ⇒ Object
readonly
Returns the value of attribute central.
-
#enabled_actions ⇒ Object
readonly
Returns the value of attribute enabled_actions.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#tag ⇒ Object
readonly
Returns the value of attribute tag.
Instance Method Summary collapse
-
#asset_store ⇒ Object
Lazy-initialize the asset_store, preferably after the Node has launched.
-
#check_in(critical = false) ⇒ Object
Checking in with the central server informs it of the location and configuration of this Node.
-
#check_in_periodically ⇒ Object
private
If communication is interrupted for external reasons, the central server will assume that the node has gone down.
- #check_on_workers ⇒ Object
-
#check_out ⇒ Object
Before exiting, the Node checks out with the central server, releasing all of its WorkUnits for other Nodes to handle.
-
#free_memory ⇒ Object
The current amount of free memory in megabytes.
-
#initialize(options = {}) ⇒ Node
constructor
When creating a node, specify the port it should run on.
-
#load_average ⇒ Object
The current one-minute load average.
-
#monitor_system ⇒ Object
private
Launch a monitoring thread that periodically checks the node’s load average and the amount of free memory remaining.
-
#overloaded? ⇒ Boolean
Is the node overloaded? If configured, checks if the load average is greater than ‘max_load’, or if the available RAM is less than ‘min_free_memory’.
- #resolve_work(unit_id) ⇒ Object
-
#shut_down ⇒ Object
private
At shut down, de-register with the central server before exiting.
-
#start ⇒ Object
Starting up a Node registers with the central server and begins to listen for incoming WorkUnits.
- #track_work(id, thread) ⇒ Object
-
#trap_signals ⇒ Object
private
Trap exit signals in order to shut down cleanly.
Constructor Details
#initialize(options = {}) ⇒ Node
When creating a node, specify the port it should run on.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/cloud_crowd/node.rb', line 72
def initialize(options={})
super()
require 'json'
CloudCrowd.identity = :node
@central = CloudCrowd.central_server
@host = Socket.gethostname
@enabled_actions = CloudCrowd.actions.keys - (CloudCrowd.config[:disabled_actions] || [])
@port = options[:port] || DEFAULT_PORT
@id = "#{@host}:#{@port}"
@daemon = !!options[:daemonize]
@tag = options[:tag]
@overloaded = false
@max_load = CloudCrowd.config[:max_load]
@min_memory = CloudCrowd.config[:min_free_memory]
@work = {}
start unless ENV['RACK_ENV'] == 'test'
end
|
Instance Attribute Details
#central ⇒ Object (readonly)
Returns the value of attribute central.
38 39 40 |
# File 'lib/cloud_crowd/node.rb', line 38
def central
@central
end
|
#enabled_actions ⇒ Object (readonly)
Returns the value of attribute enabled_actions.
38 39 40 |
# File 'lib/cloud_crowd/node.rb', line 38
def enabled_actions
@enabled_actions
end
|
#host ⇒ Object (readonly)
Returns the value of attribute host.
38 39 40 |
# File 'lib/cloud_crowd/node.rb', line 38
def host
@host
end
|
#port ⇒ Object (readonly)
Returns the value of attribute port.
38 39 40 |
# File 'lib/cloud_crowd/node.rb', line 38
def port
@port
end
|
#tag ⇒ Object (readonly)
Returns the value of attribute tag.
38 39 40 |
# File 'lib/cloud_crowd/node.rb', line 38
def tag
@tag
end
|
Instance Method Details
#asset_store ⇒ Object
Lazy-initialize the asset_store, preferably after the Node has launched.
130 131 132 |
# File 'lib/cloud_crowd/node.rb', line 130
def asset_store
@asset_store ||= AssetStore.new
end
|
#check_in(critical = false) ⇒ Object
Checking in with the central server informs it of the location and configuration of this Node. If it can’t check-in, there’s no point in starting.
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/cloud_crowd/node.rb', line 111
def check_in(critical=false)
@central["/node/#{@id}"].put(
:busy => @overloaded,
:tag => @tag,
:max_workers => CloudCrowd.config[:max_workers],
:enabled_actions => @enabled_actions.join(',')
)
rescue RestClient::Exception, Errno::ECONNREFUSED
CloudCrowd.log "Failed to connect to the central server (#{@central.to_s})."
raise SystemExit if critical
end
|
#check_in_periodically ⇒ Object (private)
If communication is interrupted for external reasons, the central server will assume that the node has gone down. Checking in will let central know it’s still online.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/cloud_crowd/node.rb', line 210
def check_in_periodically
@check_in_thread = Thread.new do
loop do
check_on_workers
reply = ""
1.upto(5).each do | attempt_number |
# sleep for an ever increasing amount of time to prevent overloading the server
sleep CHECK_IN_INTERVAL * attempt_number
reply = check_in
# if we did not receive a reply, the server has went away; it
# will reply with an empty string if the check-in succeeds
if reply.nil?
CloudCrowd.log "Failed on attempt ##{attempt_number} to check in with server"
else
break
end
end
if reply.nil?
CloudCrowd.log "Giving up after repeated attempts to contact server"
raise SystemExit
end
end
end
end
|
#check_on_workers ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/cloud_crowd/node.rb', line 165
def check_on_workers
# ToDo, this isn't really thread safe.
# there are events in which a job completes and exits successfully
# while iteration here is taking place. However the interleaving
# is such that the work unit should be complete / cleaned up already
# even in the event that a thread is flagged as dead here.
@work.each do |unit_id, work|
unless work[:thread].alive?
CloudCrowd.log "Notifying central server that worker #{work[:thread].pid} for unit #{unit_id} mysteriously died."
data = {
id: unit_id,
pid: work[:thread].pid,
status: 'failed',
output: { output: "Worker thread #{work[:thread].pid} died on #{host} prior to #{Time.now}" }.to_json,
time: Time.now - work[:start] # this is time until failure was noticed
}
@central["/work/#{unit_id}"].put(data)
resolve_work(unit_id)
end
end
end
|
#check_out ⇒ Object
Before exiting, the Node checks out with the central server, releasing all of its WorkUnits for other Nodes to handle
125 126 127 |
# File 'lib/cloud_crowd/node.rb', line 125
def check_out
@central["/node/#{@id}"].delete
end
|
#free_memory ⇒ Object
The current amount of free memory in megabytes.
148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/cloud_crowd/node.rb', line 148
def free_memory
case RUBY_PLATFORM
when /darwin/
stats = `vm_stat`
@mac_page_size ||= stats.match(SCRAPE_MAC_PAGE)[1].to_f / 1048576.0
stats.match(SCRAPE_MAC_MEMORY)[1].to_f * @mac_page_size
when /linux/
`cat /proc/meminfo`.match(SCRAPE_LINUX_MEMORY)[1].to_f / 1024.0
else
raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform"
end
end
|
#load_average ⇒ Object
The current one-minute load average.
143 144 145 |
# File 'lib/cloud_crowd/node.rb', line 143
def load_average
`uptime`.match(SCRAPE_UPTIME).to_s.to_f
end
|
#monitor_system ⇒ Object (private)
Launch a monitoring thread that periodically checks the node’s load average and the amount of free memory remaining. If we transition out of the overloaded state, let central know.
196 197 198 199 200 201 202 203 204 205 |
# File 'lib/cloud_crowd/node.rb', line 196
def monitor_system
@monitor_thread = Thread.new do
loop do
was_overloaded = @overloaded
@overloaded = overloaded?
check_in if was_overloaded && !@overloaded
sleep MONITOR_INTERVAL
end
end
end
|
#overloaded? ⇒ Boolean
Is the node overloaded? If configured, checks if the load average is greater than ‘max_load’, or if the available RAM is less than ‘min_free_memory’.
137 138 139 140 |
# File 'lib/cloud_crowd/node.rb', line 137
def overloaded?
(@max_load && load_average > @max_load) ||
(@min_memory && free_memory < @min_memory)
end
|
#resolve_work(unit_id) ⇒ Object
187 188 189 |
# File 'lib/cloud_crowd/node.rb', line 187
def resolve_work(unit_id)
@work.delete(unit_id)
end
|
#shut_down ⇒ Object (private)
At shut down, de-register with the central server before exiting.
243 244 245 246 247 248 249 |
# File 'lib/cloud_crowd/node.rb', line 243
def shut_down
@check_in_thread.kill if @check_in_thread
@monitor_thread.kill if @monitor_thread
check_out
@server_thread.kill if @server_thread
Process.exit
end
|
#start ⇒ Object
Starting up a Node registers with the central server and begins to listen for incoming WorkUnits.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/cloud_crowd/node.rb', line 92
def start
FileUtils.mkdir_p(CloudCrowd.log_path) if @daemon && !File.exists?(CloudCrowd.log_path)
@server = Thin::Server.new('0.0.0.0', @port, self, :signals => false)
@server.tag = 'cloud-crowd-node'
@server.pid_file = CloudCrowd.pid_path('node.pid')
@server.log_file = CloudCrowd.log_path('node.log')
@server.daemonize if @daemon
trap_signals
asset_store
@server_thread = Thread.new { @server.start }
check_in(true)
check_in_periodically
monitor_system if @max_load || @min_memory
@server_thread.join
end
|
#track_work(id, thread) ⇒ Object
161 162 163 |
# File 'lib/cloud_crowd/node.rb', line 161
def track_work(id, thread)
@work[id] = { thread: thread, start: Time.now }
end
|
#trap_signals ⇒ Object (private)
Trap exit signals in order to shut down cleanly.
236 237 238 239 240 |
# File 'lib/cloud_crowd/node.rb', line 236
def trap_signals
Signal.trap('QUIT') { shut_down }
Signal.trap('INT') { shut_down }
Signal.trap('TERM') { shut_down }
end
|