Class: Bumbleworks::Ruote

Inherits:
Object
  • Object
show all
Defined in:
lib/bumbleworks/ruote.rb

Defined Under Namespace

Classes: CancelTimeout, KillTimeout

Class Method Summary collapse

Class Method Details

.cancel_all_processes!(options = {}) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/bumbleworks/ruote.rb', line 73

def cancel_all_processes!(options = {})
  unless options[:method] == :kill
    options[:method] = :cancel
  end

  notified_process_wfids = []

  Bumbleworks::Support.wait_until(options) do
    new_process_wfids = dashboard.process_wfids - notified_process_wfids
    if options[:method] == :cancel || !options[:force]
      send_cancellation_message(options[:method], new_process_wfids)
    else
      storage.clear
    end
    notified_process_wfids += new_process_wfids
    dashboard.process_wfids.count == 0
  end
rescue Bumbleworks::Support::WaitTimeout
  error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
  raise error_type, "Process #{options[:method]} taking too long - #{dashboard.process_wfids.count} processes remain.  Errors: #{dashboard.errors}"
end

.cancel_process!(wfid, options = {}) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/bumbleworks/ruote.rb', line 50

def cancel_process!(wfid, options = {})
  unless options[:method] == :kill
    options[:method] = :cancel
  end

  if options[:method] == :cancel || !options[:force]
    dashboard.send(options[:method], wfid)
  else
    storage.remove_process(wfid)
  end

  Bumbleworks::Support.wait_until(options) do
    dashboard.process(wfid).nil?
  end
rescue Bumbleworks::Support::WaitTimeout
  error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
  raise error_type, "Process #{options[:method]} for wfid '#{wfid}' taking too long.  Errors: #{dashboard.errors(wfid)}"
end

.dashboard(options = {}) ⇒ Object



12
13
14
# File 'lib/bumbleworks/ruote.rb', line 12

def dashboard(options = {})
  @dashboard ||= ::Ruote::Dashboard.new(storage)
end

.kill_all_processes!(options = {}) ⇒ Object



101
102
103
# File 'lib/bumbleworks/ruote.rb', line 101

def kill_all_processes!(options = {})
  cancel_all_processes!(options.merge(:method => :kill))
end

.kill_process!(wfid, options = {}) ⇒ Object



69
70
71
# File 'lib/bumbleworks/ruote.rb', line 69

def kill_process!(wfid, options = {})
  cancel_process!(wfid, options.merge(:method => :kill))
end

.launch(name, *args) ⇒ Object



44
45
46
47
48
# File 'lib/bumbleworks/ruote.rb', line 44

def launch(name, *args)
  set_catchall_if_needed
  definition = Bumbleworks::ProcessDefinition.find_by_name(name)
  dashboard.launch(definition.tree, *args)
end

.register_entity_interactorObject



113
114
115
116
117
118
# File 'lib/bumbleworks/ruote.rb', line 113

def register_entity_interactor
  unless dashboard.participant_list.any? { |part| part.regex == "^(ask|tell)_entity$" }
    entity_interactor = ::Ruote::ParticipantEntry.new(["^(ask|tell)_entity$", ["Bumbleworks::EntityInteractor", {}]])
    dashboard.participant_list = dashboard.participant_list.unshift(entity_interactor)
  end
end

.register_error_dispatcherObject



120
121
122
123
124
125
126
# File 'lib/bumbleworks/ruote.rb', line 120

def register_error_dispatcher
  unless dashboard.participant_list.any? { |part| part.regex == '^error_dispatcher$' }
    error_dispatcher = ::Ruote::ParticipantEntry.new(['^error_dispatcher$', ["Bumbleworks::ErrorDispatcher", {}]])
    dashboard.participant_list = dashboard.participant_list.unshift(error_dispatcher)
    dashboard.on_error = 'error_dispatcher'
  end
end

.register_participants(&block) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/bumbleworks/ruote.rb', line 105

def register_participants(&block)
  dashboard.register(&block) if block
  register_entity_interactor
  register_error_dispatcher
  set_catchall_if_needed
  dashboard.participant_list
end

.reset!Object



147
148
149
150
151
152
153
154
155
# File 'lib/bumbleworks/ruote.rb', line 147

def reset!
  if Bumbleworks.storage && storage
    storage.purge!
    storage.shutdown
  end
  @dashboard.shutdown if @dashboard && @dashboard.respond_to?(:shutdown)
  @storage = nil
  @dashboard = nil
end

.send_cancellation_message(method, process_wfids) ⇒ Object



95
96
97
98
99
# File 'lib/bumbleworks/ruote.rb', line 95

def send_cancellation_message(method, process_wfids)
  process_wfids.each do |wfid|
    dashboard.send(method, wfid)
  end
end

.set_catchall_if_neededObject



134
135
136
137
138
139
140
141
# File 'lib/bumbleworks/ruote.rb', line 134

def set_catchall_if_needed
  last_participant = dashboard.participant_list.last
  unless last_participant && last_participant.regex == "^.+$" &&
      ["Ruote::StorageParticipant", "Bumbleworks::StorageParticipant"].include?(last_participant.classname)
    catchall = ::Ruote::ParticipantEntry.new(["^.+$", ["Bumbleworks::StorageParticipant", {}]])
    dashboard.participant_list = dashboard.participant_list.push(catchall)
  end
end

.set_up_storage_historyObject



128
129
130
131
132
# File 'lib/bumbleworks/ruote.rb', line 128

def set_up_storage_history
  if Bumbleworks.store_history? && Bumbleworks.storage_adapter.allow_history_storage?
    dashboard.add_service('history', 'ruote/log/storage_history', 'Ruote::StorageHistory')
  end
end

.start_worker!(options = {}) ⇒ Object

Start a worker, which will begin polling for messages in the workflow storage. You can run multiple workers if you are using a storage that supports them (such as Sequel or Redis, but not Hash) - they all just have to be connected to the same storage, and be able to instantiate participants in the participant list.

Parameters:

  • options (Hash) (defaults to: {})

    startup options for the worker

Options Hash (options):

  • :verbose (Boolean)

    whether or not to spin up a “noisy” worker, which will output all messages picked up

  • :join (Boolean)

    whether or not to join the worker thread; if false, this method will return, and the worker thread will be disconnected, and killed if the calling process exits.



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/bumbleworks/ruote.rb', line 31

def start_worker!(options = {})
  set_up_storage_history
  register_error_dispatcher
  dashboard.noisy = options[:verbose] == true
  worker = Bumbleworks::Worker.new(dashboard.context)
  if options[:join] == true
    worker.run
  else
    worker.run_in_thread
  end
  worker
end

.storageObject



143
144
145
# File 'lib/bumbleworks/ruote.rb', line 143

def storage
  @storage ||= initialize_storage_adapter
end