Class: Bumbleworks::Worker
- Inherits:
-
Ruote::Worker
- Object
- Ruote::Worker
- Bumbleworks::Worker
show all
- Defined in:
- lib/bumbleworks/worker.rb
Defined Under Namespace
Classes: Info, WorkerStateNotChanged
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(*args, &block) ⇒ Worker
Returns a new instance of Worker.
92
93
94
95
96
97
98
99
|
# File 'lib/bumbleworks/worker.rb', line 92
def initialize(*args, &block)
super
@id = SecureRandom.uuid
if @info
@info = Info.new(self)
save_info
end
end
|
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
6
7
8
|
# File 'lib/bumbleworks/worker.rb', line 6
def id
@id
end
|
Class Method Details
.change_worker_state(new_state, options = {}) ⇒ Object
.forget_worker(id_to_delete) ⇒ Object
39
40
41
42
43
|
# File 'lib/bumbleworks/worker.rb', line 39
def forget_worker(id_to_delete)
purge_worker_info do |id, info|
id == id_to_delete
end
end
|
.info ⇒ Object
9
10
11
|
# File 'lib/bumbleworks/worker.rb', line 9
def info
Bumbleworks.dashboard.worker_info || {}
end
|
.pause_all(options = {}) ⇒ Object
21
22
23
|
# File 'lib/bumbleworks/worker.rb', line 21
def pause_all(options = {})
change_worker_state('paused', options)
end
|
.purge_stale_worker_info ⇒ Object
45
46
47
48
49
|
# File 'lib/bumbleworks/worker.rb', line 45
def purge_stale_worker_info
purge_worker_info do |id, info|
info['state'].nil? || info['state'] == 'stopped'
end
end
|
.purge_worker_info(&block) ⇒ Object
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/bumbleworks/worker.rb', line 51
def purge_worker_info(&block)
doc = Bumbleworks.dashboard.storage.get('variables', 'workers')
return unless doc
doc['workers'] = doc['workers'].reject { |id, info|
block.call(id, info)
}
result = Bumbleworks.dashboard.storage.put(doc)
purge_stale_worker_info if result
info
end
|
.refresh_worker_info(options = {}) ⇒ Object
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/bumbleworks/worker.rb', line 74
def refresh_worker_info(options = {})
with_worker_state_enabled do
Bumbleworks::Support.wait_until(options) do
info.all? { |id, worker_info|
worker_info['state'] == 'stopped' ||
Time.parse(worker_info['put_at']) > Time.now - 1
}
end
end
end
|
.shutdown_all(options = {}) ⇒ Object
13
14
15
16
17
18
19
|
# File 'lib/bumbleworks/worker.rb', line 13
def shutdown_all(options = {})
change_worker_state('stopped', options)
change_worker_state('running', options)
end
|
.unpause_all(options = {}) ⇒ Object
25
26
27
|
# File 'lib/bumbleworks/worker.rb', line 25
def unpause_all(options = {})
change_worker_state('running', options)
end
|
.with_worker_state_enabled ⇒ Object
85
86
87
88
89
|
# File 'lib/bumbleworks/worker.rb', line 85
def with_worker_state_enabled
Bumbleworks.dashboard.context['worker_state_enabled'] = true
yield
Bumbleworks.dashboard.context['worker_state_enabled'] = false
end
|
.worker_states ⇒ Object
29
30
31
32
33
34
35
36
37
|
# File 'lib/bumbleworks/worker.rb', line 29
def worker_states
info.inject({}) { |hsh, info|
id, state = info[0], info[1]['state']
if state && state != 'stopped'
hsh[id] = state
end
hsh
}
end
|
Instance Method Details
#determine_state ⇒ Object
110
111
112
113
114
115
|
# File 'lib/bumbleworks/worker.rb', line 110
def determine_state
if @context['worker_state_enabled']
super
save_info
end
end
|
#info ⇒ Object
117
118
119
|
# File 'lib/bumbleworks/worker.rb', line 117
def info
self.class.info[id]
end
|
#save_info ⇒ Object
101
102
103
|
# File 'lib/bumbleworks/worker.rb', line 101
def save_info
@info.save if @info
end
|
#shutdown ⇒ Object
105
106
107
108
|
# File 'lib/bumbleworks/worker.rb', line 105
def shutdown
super
save_info
end
|