Class: Bumbleworks::Worker
- Inherits:
-
Ruote::Worker
- Object
- Ruote::Worker
- Bumbleworks::Worker
show all
- Defined in:
- lib/bumbleworks/worker.rb,
lib/bumbleworks/worker/info.rb,
lib/bumbleworks/worker/proxy.rb
Defined Under Namespace
Classes: Info, Proxy, WorkerStateNotChanged
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(*args, &block) ⇒ Worker
Returns a new instance of Worker.
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/bumbleworks/worker.rb', line 86
def initialize(*args, &block)
super
@pid = Process.pid
@id = SecureRandom.uuid
@launched_at = Time.now
@ip = Ruote.local_ip
@hostname = Socket.gethostname
@system = `uname -a`.strip rescue nil
if @info
@info = Info.new(self)
save_info
end
end
|
Instance Attribute Details
#hostname ⇒ Object
Returns the value of attribute hostname.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def hostname
@hostname
end
|
#id ⇒ Object
Returns the value of attribute id.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def id
@id
end
|
#ip ⇒ Object
Returns the value of attribute ip.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def ip
@ip
end
|
#launched_at ⇒ Object
Returns the value of attribute launched_at.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def launched_at
@launched_at
end
|
#pid ⇒ Object
Returns the value of attribute pid.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def pid
@pid
end
|
#system ⇒ Object
Returns the value of attribute system.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def system
@system
end
|
Class Method Details
.active_worker_states ⇒ Object
31
32
33
34
35
36
37
38
39
|
# File 'lib/bumbleworks/worker.rb', line 31
def active_worker_states
info.inject({}) { |hsh, info|
id, state = info.id, info.state
unless info.state.nil? || info.in_stopped_state?
hsh[id] = state
end
hsh
}
end
|
.change_worker_state(new_state, options = {}) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/bumbleworks/worker.rb', line 41
def change_worker_state(new_state, options = {})
with_worker_state_enabled do
Bumbleworks.dashboard.worker_state = new_state
Bumbleworks::Support.wait_until(options) do
active_worker_states.values.all? { |state| state == new_state }
end
end
return true
rescue Bumbleworks::Support::WaitTimeout
raise WorkerStateNotChanged, "Worker states: #{active_worker_states.inspect}"
end
|
.control_document ⇒ Object
69
70
71
72
73
74
75
|
# File 'lib/bumbleworks/worker.rb', line 69
def control_document
doc = Bumbleworks.dashboard.storage.get('variables', 'worker_control') || {}
doc['type'] = 'variables'
doc['_id'] = 'worker_control'
doc['workers'] ||= {}
doc
end
|
.info_document ⇒ Object
77
78
79
80
81
82
83
|
# File 'lib/bumbleworks/worker.rb', line 77
def info_document
doc = Bumbleworks.dashboard.storage.get('variables', 'workers') || {}
doc['type'] = 'variables'
doc['_id'] = 'workers'
doc['workers'] ||= {}
doc
end
|
.pause_all(options = {}) ⇒ Object
23
24
25
|
# File 'lib/bumbleworks/worker.rb', line 23
def pause_all(options = {})
change_worker_state('paused', options)
end
|
.refresh_worker_info(options = {}) ⇒ Object
53
54
55
56
57
58
59
60
61
|
# File 'lib/bumbleworks/worker.rb', line 53
def refresh_worker_info(options = {})
with_worker_state_enabled do
info.each do |worker_info|
if !worker_info.in_stopped_state? && worker_info.stalling?
worker_info.record_new_state("stalled")
end
end
end
end
|
.shutdown_all(options = {}) ⇒ Object
14
15
16
17
18
19
20
21
|
# File 'lib/bumbleworks/worker.rb', line 14
def shutdown_all(options = {})
change_worker_state('stopped', options)
ensure
change_worker_state('running', options)
end
|
.unpause_all(options = {}) ⇒ Object
27
28
29
|
# File 'lib/bumbleworks/worker.rb', line 27
def unpause_all(options = {})
change_worker_state('running', options)
end
|
.with_worker_state_enabled ⇒ Object
63
64
65
66
67
|
# File 'lib/bumbleworks/worker.rb', line 63
def with_worker_state_enabled
Bumbleworks.dashboard.context['worker_state_enabled'] = true
yield
Bumbleworks.dashboard.context['worker_state_enabled'] = false
end
|
Instance Method Details
#class_name ⇒ Object
102
103
104
|
# File 'lib/bumbleworks/worker.rb', line 102
def class_name
self.class.to_s
end
|
#desired_state ⇒ Object
119
120
121
122
123
124
|
# File 'lib/bumbleworks/worker.rb', line 119
def desired_state
control_hash = worker_control_variable ||
@storage.get("variables", "worker") ||
{ "state" => "running" }
control_hash["state"]
end
|
#determine_state ⇒ Object
126
127
128
129
130
131
132
133
|
# File 'lib/bumbleworks/worker.rb', line 126
def determine_state
@state_mutex.synchronize do
if @state != "stopped" && @context["worker_state_enabled"]
@state = desired_state
save_info
end
end
end
|
#info ⇒ Object
135
136
137
|
# File 'lib/bumbleworks/worker.rb', line 135
def info
self.class.info[id]
end
|
#save_info ⇒ Object
106
107
108
|
# File 'lib/bumbleworks/worker.rb', line 106
def save_info
@info.save if @info
end
|
#shutdown ⇒ Object
110
111
112
113
|
# File 'lib/bumbleworks/worker.rb', line 110
def shutdown
super
save_info
end
|
#worker_control_variable ⇒ Object
115
116
117
|
# File 'lib/bumbleworks/worker.rb', line 115
def worker_control_variable
self.class.control_document["workers"][id]
end
|