Class: Bumbleworks::Ruote
- Inherits:
-
Object
- Object
- Bumbleworks::Ruote
show all
- Defined in:
- lib/bumbleworks/ruote.rb
Defined Under Namespace
Classes: CancelTimeout, KillTimeout
Class Method Summary
collapse
Class Method Details
.cancel_all_processes!(options = {}) ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/bumbleworks/ruote.rb', line 73
def cancel_all_processes!(options = {})
unless options[:method] == :kill
options[:method] = :cancel
end
notified_process_wfids = []
Bumbleworks::Support.wait_until(options) do
new_process_wfids = dashboard.process_wfids - notified_process_wfids
if options[:method] == :cancel || !options[:force]
send_cancellation_message(options[:method], new_process_wfids)
else
storage.clear
end
notified_process_wfids += new_process_wfids
dashboard.process_wfids.count == 0
end
rescue Bumbleworks::Support::WaitTimeout
error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
raise error_type, "Process #{options[:method]} taking too long - #{dashboard.process_wfids.count} processes remain. Errors: #{dashboard.errors}"
end
|
.cancel_process!(wfid, options = {}) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/bumbleworks/ruote.rb', line 50
def cancel_process!(wfid, options = {})
unless options[:method] == :kill
options[:method] = :cancel
end
if options[:method] == :cancel || !options[:force]
dashboard.send(options[:method], wfid)
else
storage.remove_process(wfid)
end
Bumbleworks::Support.wait_until(options) do
dashboard.process(wfid).nil?
end
rescue Bumbleworks::Support::WaitTimeout
error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
raise error_type, "Process #{options[:method]} for wfid '#{wfid}' taking too long. Errors: #{dashboard.errors(wfid)}"
end
|
.dashboard(options = {}) ⇒ Object
12
13
14
|
# File 'lib/bumbleworks/ruote.rb', line 12
def dashboard(options = {})
@dashboard ||= ::Ruote::Dashboard.new(storage)
end
|
.kill_all_processes!(options = {}) ⇒ Object
101
102
103
|
# File 'lib/bumbleworks/ruote.rb', line 101
def kill_all_processes!(options = {})
cancel_all_processes!(options.merge(:method => :kill))
end
|
.kill_process!(wfid, options = {}) ⇒ Object
69
70
71
|
# File 'lib/bumbleworks/ruote.rb', line 69
def kill_process!(wfid, options = {})
cancel_process!(wfid, options.merge(:method => :kill))
end
|
.launch(name, *args) ⇒ Object
44
45
46
47
48
|
# File 'lib/bumbleworks/ruote.rb', line 44
def launch(name, *args)
set_catchall_if_needed
definition = Bumbleworks::ProcessDefinition.find_by_name(name)
dashboard.launch(definition.tree, *args)
end
|
.register_entity_interactor ⇒ Object
113
114
115
116
117
118
|
# File 'lib/bumbleworks/ruote.rb', line 113
def register_entity_interactor
unless dashboard.participant_list.any? { |part| part.regex == "^(ask|tell)_entity$" }
entity_interactor = ::Ruote::ParticipantEntry.new(["^(ask|tell)_entity$", ["Bumbleworks::EntityInteractor", {}]])
dashboard.participant_list = dashboard.participant_list.unshift(entity_interactor)
end
end
|
.register_error_dispatcher ⇒ Object
120
121
122
123
124
125
126
|
# File 'lib/bumbleworks/ruote.rb', line 120
def register_error_dispatcher
unless dashboard.participant_list.any? { |part| part.regex == '^error_dispatcher$' }
error_dispatcher = ::Ruote::ParticipantEntry.new(['^error_dispatcher$', ["Bumbleworks::ErrorDispatcher", {}]])
dashboard.participant_list = dashboard.participant_list.unshift(error_dispatcher)
dashboard.on_error = 'error_dispatcher'
end
end
|
.register_participants(&block) ⇒ Object
105
106
107
108
109
110
111
|
# File 'lib/bumbleworks/ruote.rb', line 105
def register_participants(&block)
dashboard.register(&block) if block
register_entity_interactor
register_error_dispatcher
set_catchall_if_needed
dashboard.participant_list
end
|
.reset! ⇒ Object
147
148
149
150
151
152
153
154
155
|
# File 'lib/bumbleworks/ruote.rb', line 147
def reset!
if Bumbleworks.storage && storage
storage.purge!
storage.shutdown
end
@dashboard.shutdown if @dashboard && @dashboard.respond_to?(:shutdown)
@storage = nil
@dashboard = nil
end
|
.send_cancellation_message(method, process_wfids) ⇒ Object
95
96
97
98
99
|
# File 'lib/bumbleworks/ruote.rb', line 95
def send_cancellation_message(method, process_wfids)
process_wfids.each do |wfid|
dashboard.send(method, wfid)
end
end
|
.set_catchall_if_needed ⇒ Object
134
135
136
137
138
139
140
141
|
# File 'lib/bumbleworks/ruote.rb', line 134
def set_catchall_if_needed
last_participant = dashboard.participant_list.last
unless last_participant && last_participant.regex == "^.+$" &&
["Ruote::StorageParticipant", "Bumbleworks::StorageParticipant"].include?(last_participant.classname)
catchall = ::Ruote::ParticipantEntry.new(["^.+$", ["Bumbleworks::StorageParticipant", {}]])
dashboard.participant_list = dashboard.participant_list.push(catchall)
end
end
|
.set_up_storage_history ⇒ Object
128
129
130
131
132
|
# File 'lib/bumbleworks/ruote.rb', line 128
def set_up_storage_history
if Bumbleworks.store_history? && Bumbleworks.storage_adapter.allow_history_storage?
dashboard.add_service('history', 'ruote/log/storage_history', 'Ruote::StorageHistory')
end
end
|
.start_worker!(options = {}) ⇒ Object
Start a worker, which will begin polling for messages in the workflow storage. You can run multiple workers if you are using a storage that supports them (such as Sequel or Redis, but not Hash) - they all just have to be connected to the same storage, and be able to instantiate participants in the participant list.
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/bumbleworks/ruote.rb', line 31
def start_worker!(options = {})
set_up_storage_history
register_error_dispatcher
dashboard.noisy = options[:verbose] == true
worker = Bumbleworks::Worker.new(dashboard.context)
if options[:join] == true
worker.run
else
worker.run_in_thread
end
worker
end
|
.storage ⇒ Object
143
144
145
|
# File 'lib/bumbleworks/ruote.rb', line 143
def storage
@storage ||= initialize_storage_adapter
end
|