Class: CloudCrowd::Node

Inherits:
Sinatra::Base
  • Object
show all
Defined in:
lib/cloud_crowd/node.rb

Overview

A Node is a Sinatra/Thin application that runs a single instance per-machine It registers with the central server, receives WorkUnits, and forks off Workers to process them. The actions are:

get /heartbeat

Returns 200 OK to let monitoring tools know the server's up.

post /work

The central server hits /work to dispatch a WorkUnit to this Node.

Constant Summary collapse

DEFAULT_PORT =

A Node's default port. You only run a single node per machine, so they can all use the same port without any problems.

9063
SCRAPE_UPTIME =

A list of regex scrapers, which let us extract the one-minute load average and the amount of free memory on different flavors of UNIX.

/\d+\.\d+/
SCRAPE_LINUX_MEMORY =
/MemFree:\s+(\d+) kB/
SCRAPE_MAC_MEMORY =
/Pages free:\s+(\d+)./
SCRAPE_MAC_PAGE =
/page size of (\d+) bytes/
MONITOR_INTERVAL =

The interval at which the node monitors the machine's load and memory use (if configured to do so in config.yml).

3
CHECK_IN_INTERVAL =

The interval at which the node regularly checks in with central (5 min).

300
OVERLOADED_MESSAGE =

The response sent back when this node is overloaded.

'Node Overloaded'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Node

When creating a node, specify the port it should run on.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/cloud_crowd/node.rb', line 68

def initialize(options={})
  super()
  require 'json'
  CloudCrowd.identity = :node
  @central          = CloudCrowd.central_server
  @host             = Socket.gethostname
  @enabled_actions  = CloudCrowd.actions.keys - (CloudCrowd.config[:disabled_actions] || [])
  @port             = options[:port] || DEFAULT_PORT
  @id               = "#{@host}:#{@port}"
  @daemon           = !!options[:daemonize]
  @tag              = options[:tag]
  @overloaded       = false
  @max_load         = CloudCrowd.config[:max_load]
  @min_memory       = CloudCrowd.config[:min_free_memory]
  @work             = {}
  start unless ENV['RACK_ENV'] == 'test'
end

Instance Attribute Details

#centralObject (readonly)

Returns the value of attribute central



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def central
  @central
end

#enabled_actionsObject (readonly)

Returns the value of attribute enabled_actions



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def enabled_actions
  @enabled_actions
end

#hostObject (readonly)

Returns the value of attribute host



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def port
  @port
end

#tagObject (readonly)

Returns the value of attribute tag



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def tag
  @tag
end

Instance Method Details

#asset_storeObject

Lazy-initialize the asset_store, preferably after the Node has launched.



126
127
128
# File 'lib/cloud_crowd/node.rb', line 126

def asset_store
  @asset_store ||= AssetStore.new
end

#check_in(critical = false) ⇒ Object

Checking in with the central server informs it of the location and configuration of this Node. If it can't check-in, there's no point in starting.



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/cloud_crowd/node.rb', line 107

def check_in(critical=false)
  @central["/node/#{@id}"].put(
    :busy             => @overloaded,
    :tag              => @tag,
    :max_workers      => CloudCrowd.config[:max_workers],
    :enabled_actions  => @enabled_actions.join(',')
  )
rescue RestClient::Exception, Errno::ECONNREFUSED
  CloudCrowd.log "Failed to connect to the central server (#{@central.to_s})."
  raise SystemExit if critical
end

#check_on_workersObject



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/cloud_crowd/node.rb', line 161

def check_on_workers
  # ToDo, this isn't really thread safe.
  # there are events in which a job completes and exits successfully
  # while iteration here is taking place.  However the interleaving
  # is such that the work unit should be complete / cleaned up already
  # even in the event that a thread is flagged as dead here.
  @work.each do |unit_id, work|
    unless work[:thread].alive?
      CloudCrowd.log "Notifying central server that worker #{work[:thread].pid} for unit #{unit_id} mysteriously died."
      data = {
        id:     unit_id,
        pid:    work[:thread].pid,
        status: 'failed',
        output: { output: "Worker thread #{work[:thread].pid} died on #{host} prior to #{Time.now}" }.to_json,
        time:   Time.now - work[:start] # this is time until failure was noticed
      }
      @central["/work/#{unit_id}"].put(data)
      resolve_work(unit_id)
    end
  end
end

#check_outObject

Before exiting, the Node checks out with the central server, releasing all of its WorkUnits for other Nodes to handle



121
122
123
# File 'lib/cloud_crowd/node.rb', line 121

def check_out
  @central["/node/#{@id}"].delete
end

#free_memoryObject

The current amount of free memory in megabytes.



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/cloud_crowd/node.rb', line 144

def free_memory
  case RUBY_PLATFORM
  when /darwin/
    stats = `vm_stat`
    @mac_page_size ||= stats.match(SCRAPE_MAC_PAGE)[1].to_f / 1048576.0
    stats.match(SCRAPE_MAC_MEMORY)[1].to_f * @mac_page_size
  when /linux/
    `cat /proc/meminfo`.match(SCRAPE_LINUX_MEMORY)[1].to_f / 1024.0
  else
    raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform"
  end
end

#load_averageObject

The current one-minute load average.



139
140
141
# File 'lib/cloud_crowd/node.rb', line 139

def load_average
  `uptime`.match(SCRAPE_UPTIME).to_s.to_f
end

#overloaded?Boolean

Is the node overloaded? If configured, checks if the load average is greater than 'max_load', or if the available RAM is less than 'min_free_memory'.

Returns:

  • (Boolean)


133
134
135
136
# File 'lib/cloud_crowd/node.rb', line 133

def overloaded?
  (@max_load && load_average > @max_load) ||
  (@min_memory && free_memory < @min_memory)
end

#resolve_work(unit_id) ⇒ Object



183
184
185
# File 'lib/cloud_crowd/node.rb', line 183

def resolve_work(unit_id)
  @work.delete(unit_id)
end

#startObject

Starting up a Node registers with the central server and begins to listen for incoming WorkUnits.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/cloud_crowd/node.rb', line 88

def start
  FileUtils.mkdir_p(CloudCrowd.log_path) if @daemon && !File.exists?(CloudCrowd.log_path)
  @server          = Thin::Server.new('0.0.0.0', @port, self, :signals => false)
  @server.tag      = 'cloud-crowd-node'
  @server.pid_file = CloudCrowd.pid_path('node.pid')
  @server.log_file = CloudCrowd.log_path('node.log')
  @server.daemonize if @daemon
  trap_signals
  asset_store
  @server_thread   = Thread.new { @server.start }
  check_in(true)
  check_in_periodically
  monitor_system if @max_load || @min_memory
  @server_thread.join
end

#track_work(id, thread) ⇒ Object



157
158
159
# File 'lib/cloud_crowd/node.rb', line 157

def track_work(id, thread)
  @work[id] = { thread: thread, start: Time.now }
end