Class: CloudCrowd::Node

Inherits:
Sinatra::Base
  • Object
show all
Defined in:
lib/cloud_crowd/node.rb

Overview

A Node is a Sinatra/Thin application that runs a single instance per-machine It registers with the central server, receives WorkUnits, and forks off Workers to process them. The actions are:

get /heartbeat

Returns 200 OK to let monitoring tools know the server’s up.

post /work

The central server hits /work to dispatch a WorkUnit to this Node.

Constant Summary collapse

DEFAULT_PORT =

A Node’s default port. You only run a single node per machine, so they can all use the same port without any problems.

9063
SCRAPE_UPTIME =

A list of regex scrapers, which let us extract the one-minute load average and the amount of free memory on different flavors of UNIX.

/\d+\.\d+/
SCRAPE_LINUX_MEMORY =
/MemFree:\s+(\d+) kB/
SCRAPE_MAC_MEMORY =
/Pages free:\s+(\d+)./
SCRAPE_MAC_PAGE =
/page size of (\d+) bytes/
MONITOR_INTERVAL =

The interval at which the node monitors the machine’s load and memory use (if configured to do so in config.yml).

3
CHECK_IN_INTERVAL =

The interval at which the node regularly checks in with central (5 min).

300
OVERLOADED_MESSAGE =

The response sent back when this node is overloaded.

'Node Overloaded'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Node

When creating a node, specify the port it should run on.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/cloud_crowd/node.rb', line 72

def initialize(options={})
  super()
  require 'json'
  CloudCrowd.identity = :node
  @central          = CloudCrowd.central_server
  @host             = Socket.gethostname
  @enabled_actions  = CloudCrowd.actions.keys - (CloudCrowd.config[:disabled_actions] || [])
  @port             = options[:port] || DEFAULT_PORT
  @id               = "#{@host}:#{@port}"
  @daemon           = !!options[:daemonize]
  @tag              = options[:tag]
  @overloaded       = false
  @max_load         = CloudCrowd.config[:max_load]
  @min_memory       = CloudCrowd.config[:min_free_memory]
  @work             = {}
  start unless ENV['RACK_ENV'] == 'test'
end

Instance Attribute Details

#centralObject (readonly)

Returns the value of attribute central.



38
39
40
# File 'lib/cloud_crowd/node.rb', line 38

def central
  @central
end

#enabled_actionsObject (readonly)

Returns the value of attribute enabled_actions.



38
39
40
# File 'lib/cloud_crowd/node.rb', line 38

def enabled_actions
  @enabled_actions
end

#hostObject (readonly)

Returns the value of attribute host.



38
39
40
# File 'lib/cloud_crowd/node.rb', line 38

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



38
39
40
# File 'lib/cloud_crowd/node.rb', line 38

def port
  @port
end

#tagObject (readonly)

Returns the value of attribute tag.



38
39
40
# File 'lib/cloud_crowd/node.rb', line 38

def tag
  @tag
end

Instance Method Details

#asset_storeObject

Lazy-initialize the asset_store, preferably after the Node has launched.



130
131
132
# File 'lib/cloud_crowd/node.rb', line 130

def asset_store
  @asset_store ||= AssetStore.new
end

#check_in(critical = false) ⇒ Object

Checking in with the central server informs it of the location and configuration of this Node. If it can’t check-in, there’s no point in starting.



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/cloud_crowd/node.rb', line 111

def check_in(critical=false)
  @central["/node/#{@id}"].put(
    :busy             => @overloaded,
    :tag              => @tag,
    :max_workers      => CloudCrowd.config[:max_workers],
    :enabled_actions  => @enabled_actions.join(',')
  )
rescue RestClient::Exception, Errno::ECONNREFUSED
  CloudCrowd.log "Failed to connect to the central server (#{@central.to_s})."
  raise SystemExit if critical
end

#check_in_periodicallyObject (private)

If communication is interrupted for external reasons, the central server will assume that the node has gone down. Checking in will let central know it’s still online.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/cloud_crowd/node.rb', line 210

def check_in_periodically
  @check_in_thread = Thread.new do
    loop do
      check_on_workers
      reply = ""
      1.upto(5).each do | attempt_number |
        # sleep for an ever increasing amount of time to prevent overloading the server
        sleep CHECK_IN_INTERVAL * attempt_number
        reply = check_in
        # if we did not receive a reply, the server has went away; it
        # will reply with an empty string if the check-in succeeds
        if reply.nil?
          CloudCrowd.log "Failed on attempt ##{attempt_number} to check in with server"
        else
          break
        end
      end
      if reply.nil?
        CloudCrowd.log "Giving up after repeated attempts to contact server"
        raise SystemExit
      end
    end
  end
end

#check_on_workersObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/cloud_crowd/node.rb', line 165

def check_on_workers
  # ToDo, this isn't really thread safe.
  # there are events in which a job completes and exits successfully
  # while iteration here is taking place.  However the interleaving
  # is such that the work unit should be complete / cleaned up already
  # even in the event that a thread is flagged as dead here.
  @work.each do |unit_id, work|
    unless work[:thread].alive?
      CloudCrowd.log "Notifying central server that worker #{work[:thread].pid} for unit #{unit_id} mysteriously died."
      data = {
        id:     unit_id,
        pid:    work[:thread].pid,
        status: 'failed',
        output: { output: "Worker thread #{work[:thread].pid} died on #{host} prior to #{Time.now}" }.to_json,
        time:   Time.now - work[:start] # this is time until failure was noticed
      }
      @central["/work/#{unit_id}"].put(data)
      resolve_work(unit_id)
    end
  end
end

#check_outObject

Before exiting, the Node checks out with the central server, releasing all of its WorkUnits for other Nodes to handle



125
126
127
# File 'lib/cloud_crowd/node.rb', line 125

def check_out
  @central["/node/#{@id}"].delete
end

#free_memoryObject

The current amount of free memory in megabytes.



148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/cloud_crowd/node.rb', line 148

def free_memory
  case RUBY_PLATFORM
  when /darwin/
    stats = `vm_stat`
    @mac_page_size ||= stats.match(SCRAPE_MAC_PAGE)[1].to_f / 1048576.0
    stats.match(SCRAPE_MAC_MEMORY)[1].to_f * @mac_page_size
  when /linux/
    `cat /proc/meminfo`.match(SCRAPE_LINUX_MEMORY)[1].to_f / 1024.0
  else
    raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform"
  end
end

#load_averageObject

The current one-minute load average.



143
144
145
# File 'lib/cloud_crowd/node.rb', line 143

def load_average
  `uptime`.match(SCRAPE_UPTIME).to_s.to_f
end

#monitor_systemObject (private)

Launch a monitoring thread that periodically checks the node’s load average and the amount of free memory remaining. If we transition out of the overloaded state, let central know.



196
197
198
199
200
201
202
203
204
205
# File 'lib/cloud_crowd/node.rb', line 196

def monitor_system
  @monitor_thread = Thread.new do
    loop do
      was_overloaded = @overloaded
      @overloaded = overloaded?
      check_in if was_overloaded && !@overloaded
      sleep MONITOR_INTERVAL
    end
  end
end

#overloaded?Boolean

Is the node overloaded? If configured, checks if the load average is greater than ‘max_load’, or if the available RAM is less than ‘min_free_memory’.

Returns:

  • (Boolean)


137
138
139
140
# File 'lib/cloud_crowd/node.rb', line 137

def overloaded?
  (@max_load && load_average > @max_load) ||
  (@min_memory && free_memory < @min_memory)
end

#resolve_work(unit_id) ⇒ Object



187
188
189
# File 'lib/cloud_crowd/node.rb', line 187

def resolve_work(unit_id)
  @work.delete(unit_id)
end

#shut_downObject (private)

At shut down, de-register with the central server before exiting.



243
244
245
246
247
248
249
# File 'lib/cloud_crowd/node.rb', line 243

def shut_down
  @check_in_thread.kill if @check_in_thread
  @monitor_thread.kill if @monitor_thread
  check_out
  @server_thread.kill if @server_thread
  Process.exit
end

#startObject

Starting up a Node registers with the central server and begins to listen for incoming WorkUnits.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/cloud_crowd/node.rb', line 92

def start
  FileUtils.mkdir_p(CloudCrowd.log_path) if @daemon && !File.exists?(CloudCrowd.log_path)
  @server          = Thin::Server.new('0.0.0.0', @port, self, :signals => false)
  @server.tag      = 'cloud-crowd-node'
  @server.pid_file = CloudCrowd.pid_path('node.pid')
  @server.log_file = CloudCrowd.log_path('node.log')
  @server.daemonize if @daemon
  trap_signals
  asset_store
  @server_thread   = Thread.new { @server.start }
  check_in(true)
  check_in_periodically
  monitor_system if @max_load || @min_memory
  @server_thread.join
end

#track_work(id, thread) ⇒ Object



161
162
163
# File 'lib/cloud_crowd/node.rb', line 161

def track_work(id, thread)
  @work[id] = { thread: thread, start: Time.now }
end

#trap_signalsObject (private)

Trap exit signals in order to shut down cleanly.



236
237
238
239
240
# File 'lib/cloud_crowd/node.rb', line 236

def trap_signals
  Signal.trap('QUIT') { shut_down }
  Signal.trap('INT')  { shut_down }
  Signal.trap('TERM') { shut_down }
end