Class: CloudCrowd::NodeRecord
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- CloudCrowd::NodeRecord
- Defined in:
- lib/cloud_crowd/models/node_record.rb
Overview
A NodeRecord is the central server’s record of a Node running remotely. We can use it to assign WorkUnits to the Node, and keep track of its status. When a Node exits, it destroys this record.
Defined Under Namespace
Classes: Serializer
Constant Summary collapse
- PORT =
Extract the port number from the host id.
/:(\d+)\Z/
Class Method Summary collapse
- .available_actions ⇒ Object
-
.check_in(params, request) ⇒ Object
Register a Node with the central server.
Instance Method Summary collapse
-
#actions ⇒ Object
What Actions is this Node able to run?.
- #active_model_serializer ⇒ Object
-
#busy? ⇒ Boolean
Is this Node too busy for more work? Determined by number of workers, or the Node’s load average, as configured in config.yml.
-
#display_status ⇒ Object
The printable status of the Node.
-
#node ⇒ Object
Keep a RestClient::Resource handy for contacting the Node, including HTTP authentication, if configured.
-
#redistribute_work_units ⇒ Object
private
When a Node exits, release its WorkUnits and redistribute them to others.
-
#release_work_units ⇒ Object
Release all of this Node’s WorkUnits for other nodes to take.
-
#send_work_unit(unit) ⇒ Object
Dispatch a WorkUnit to this node.
- #to_json ⇒ Object
-
#url ⇒ Object
The URL at which this Node may be reached.
-
#worker_pids ⇒ Object
A list of the process ids of the workers currently being run by the Node.
Class Method Details
.available_actions ⇒ Object
44 45 46 |
# File 'lib/cloud_crowd/models/node_record.rb', line 44
def self.available_actions
available.map(&:actions).flatten.uniq - BlackListedAction.all.pluck(:action)
end
|
.check_in(params, request) ⇒ Object
Register a Node with the central server. This happens periodically (once every ‘Node::CHECK_IN_INTERVAL` seconds). Nodes will be de-registered if they checked in within a reasonable interval.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/cloud_crowd/models/node_record.rb', line 26
def self.check_in(params, request)
attrs = {
:ip_address => request.ip,
:port => params[:host].match(PORT)[1].to_i,
:busy => params[:busy],
:tag => params[:tag],
:max_workers => params[:max_workers],
:enabled_actions => params[:enabled_actions]
}
host_attr = {:host => params[:host]}
if (record = where(host_attr).first)
record.update_attributes!(attrs)
record
else
create!(attrs.merge(host_attr))
end
end
|
Instance Method Details
#actions ⇒ Object
What Actions is this Node able to run?
71 72 73 |
# File 'lib/cloud_crowd/models/node_record.rb', line 71
def actions
@actions ||= enabled_actions.split(',')
end
|
#active_model_serializer ⇒ Object
122 |
# File 'lib/cloud_crowd/models/node_record.rb', line 122
def active_model_serializer; Serializer; end
|
#busy? ⇒ Boolean
Is this Node too busy for more work? Determined by number of workers, or the Node’s load average, as configured in config.yml.
77 78 79 |
# File 'lib/cloud_crowd/models/node_record.rb', line 77
def busy?
busy || (max_workers && work_units.count >= max_workers)
end
|
#display_status ⇒ Object
The printable status of the Node.
94 95 96 |
# File 'lib/cloud_crowd/models/node_record.rb', line 94
def display_status
busy? ? 'busy' : 'available'
end
|
#node ⇒ Object
Keep a RestClient::Resource handy for contacting the Node, including HTTP authentication, if configured.
89 90 91 |
# File 'lib/cloud_crowd/models/node_record.rb', line 89
def node
@node ||= RestClient::Resource.new(url, CloudCrowd.client_options)
end
|
#redistribute_work_units ⇒ Object (private)
When a Node exits, release its WorkUnits and redistribute them to others. Redistribute in a separate thread to avoid delaying shutdown.
132 133 134 |
# File 'lib/cloud_crowd/models/node_record.rb', line 132
def redistribute_work_units
release_work_units
end
|
#release_work_units ⇒ Object
Release all of this Node’s WorkUnits for other nodes to take.
104 105 106 |
# File 'lib/cloud_crowd/models/node_record.rb', line 104
def release_work_units
WorkUnit.where("node_record_id = #{id}").update_all('node_record_id = null, worker_pid = null')
end
|
#send_work_unit(unit) ⇒ Object
Dispatch a WorkUnit to this node. Places the node at back at the end of the rotation. If we fail to send the WorkUnit, we consider the node to be down, and remove this record, freeing up all of its checked-out work units. If the Node responds that it’s overloaded, we mark it as busy. Returns true if the WorkUnit was dispatched successfully.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/cloud_crowd/models/node_record.rb', line 53
def send_work_unit(unit)
result = node['/work'].post(:work_unit => unit.to_json)
unit.assign_to(self, JSON.parse(result.body)['pid'])
touch && true
rescue RestClient::RequestTimeout
# The node's gone away. Destroy it and it will check in when it comes back
CloudCrowd.log "Node #{host} received RequestTimeout, removing it"
destroy && false
rescue RestClient::RequestFailed => e
raise e unless e.http_code == 503 && e.http_body == Node::OVERLOADED_MESSAGE
update_attribute(:busy, true) && false
rescue RestClient::Exception, Errno::ECONNREFUSED, Timeout::Error, Errno::ECONNRESET=>e
# Couldn't post to node, assume it's gone away.
CloudCrowd.log "Node #{host} received #{e.class} #{e}, removing it"
destroy && false
end
|
#to_json ⇒ Object
124 125 126 |
# File 'lib/cloud_crowd/models/node_record.rb', line 124
def to_json
Serializer.new(self).to_json
end
|
#url ⇒ Object
The URL at which this Node may be reached. TODO: Make sure that the host actually has externally accessible DNS.
83 84 85 |
# File 'lib/cloud_crowd/models/node_record.rb', line 83
def url
@url ||= "http://#{host}"
end
|
#worker_pids ⇒ Object
A list of the process ids of the workers currently being run by the Node.
99 100 101 |
# File 'lib/cloud_crowd/models/node_record.rb', line 99
def worker_pids
work_units.pluck('worker_pid')
end
|