Class: Bumbleworks::Ruote

Inherits:
Object
  • Object
show all
Defined in:
lib/bumbleworks/ruote.rb

Defined Under Namespace

Classes: CancelTimeout, KillTimeout

Class Method Summary collapse

Class Method Details

.cancel_all_processes!(options = {}) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/bumbleworks/ruote.rb', line 69

def cancel_all_processes!(options = {})
  options[:timeout] ||= Bumbleworks.timeout
  unless options[:method] == :kill
    options[:method] = :cancel
  end

  notified_processes = []

  start_time = Time.now
  while dashboard.processes.count > 0
    new_processes = dashboard.processes - notified_processes
    send_cancellation_message(options[:method], new_processes)
    notified_processes += new_processes

    if (Time.now - start_time) > options[:timeout]
      error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
      raise error_type, "Process #{options[:method]} taking too long - #{dashboard.processes.count} processes remain.  Errors: #{dashboard.errors}"
    end
    sleep 0.1
  end
end

.cancel_process!(wfid, options = {}) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/bumbleworks/ruote.rb', line 48

def cancel_process!(wfid, options = {})
  options[:timeout] ||= Bumbleworks.timeout
  unless options[:method] == :kill
    options[:method] = :cancel
  end

  dashboard.send(options[:method], wfid)
  start_time = Time.now
  while dashboard.process(wfid)
    if (Time.now - start_time) > options[:timeout]
      error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
      raise error_type, "Process #{options[:method]} taking too long - #{dashboard.processes.count} processes remain.  Errors: #{dashboard.errors}"
    end
    sleep 0.1
  end
end

.dashboard(options = {}) ⇒ Object



11
12
13
# File 'lib/bumbleworks/ruote.rb', line 11

def dashboard(options = {})
  @dashboard ||= ::Ruote::Dashboard.new(storage)
end

.kill_all_processes!(options = {}) ⇒ Object



97
98
99
# File 'lib/bumbleworks/ruote.rb', line 97

def kill_all_processes!(options = {})
  cancel_all_processes!(options.merge(:method => :kill))
end

.kill_process!(wfid, options = {}) ⇒ Object



65
66
67
# File 'lib/bumbleworks/ruote.rb', line 65

def kill_process!(wfid, options = {})
  cancel_process!(wfid, options.merge(:method => :kill))
end

.launch(name, *args) ⇒ Object



43
44
45
46
# File 'lib/bumbleworks/ruote.rb', line 43

def launch(name, *args)
  set_catchall_if_needed
  dashboard.launch(dashboard.variables[name], *args)
end

.register_error_handlerObject



108
109
110
111
112
113
114
# File 'lib/bumbleworks/ruote.rb', line 108

def register_error_handler
  unless dashboard.participant_list.any? { |part| part.regex == '^error_handler_participant$' }
    error_handler_participant = ::Ruote::ParticipantEntry.new(['^error_handler_participant$', ["Bumbleworks::ErrorHandlerParticipant", {}]])
    dashboard.participant_list = dashboard.participant_list.unshift(error_handler_participant)
    dashboard.on_error = 'error_handler_participant'
  end
end

.register_participants(&block) ⇒ Object



101
102
103
104
105
106
# File 'lib/bumbleworks/ruote.rb', line 101

def register_participants(&block)
  dashboard.register(&block) if block
  register_error_handler
  set_catchall_if_needed
  dashboard.participant_list
end

.reset!Object



135
136
137
138
139
140
141
142
143
# File 'lib/bumbleworks/ruote.rb', line 135

def reset!
  if @storage
    @storage.purge!
    @storage.shutdown
  end
  @dashboard.shutdown if @dashboard && @dashboard.respond_to?(:shutdown)
  @storage = nil
  @dashboard = nil
end

.send_cancellation_message(method, processes) ⇒ Object



91
92
93
94
95
# File 'lib/bumbleworks/ruote.rb', line 91

def send_cancellation_message(method, processes)
  processes.each do |ps|
    dashboard.send(method, ps.wfid)
  end
end

.set_catchall_if_neededObject



122
123
124
125
126
127
128
129
# File 'lib/bumbleworks/ruote.rb', line 122

def set_catchall_if_needed
  last_participant = dashboard.participant_list.last
  unless last_participant && last_participant.regex == "^.+$" &&
      ["Ruote::StorageParticipant", "Bumbleworks::StorageParticipant"].include?(last_participant.classname)
    catchall = ::Ruote::ParticipantEntry.new(["^.+$", ["Bumbleworks::StorageParticipant", {}]])
    dashboard.participant_list = dashboard.participant_list.push(catchall)
  end
end

.set_up_storage_historyObject



116
117
118
119
120
# File 'lib/bumbleworks/ruote.rb', line 116

def set_up_storage_history
  if Bumbleworks.storage_adapter.allow_history_storage?
    dashboard.add_service('history', 'ruote/log/storage_history', 'Ruote::StorageHistory')
  end
end

.start_worker!(options = {}) ⇒ Object

Start a worker, which will begin polling for messages in the workflow storage. You can run multiple workers if you are using a storage that supports them (such as Sequel or Redis, but not Hash) - they all just have to be connected to the same storage, and be able to instantiate participants in the participant list.

Parameters:

  • options (Hash) (defaults to: {})

    startup options for the worker

Options Hash (options):

  • :verbose (Boolean)

    whether or not to spin up a “noisy” worker, which will output all messages picked up

  • :join (Boolean)

    whether or not to join the worker thread; if false, this method will return, and the worker thread will be disconnected, and killed if the calling process exits.



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/bumbleworks/ruote.rb', line 30

def start_worker!(options = {})
  set_up_storage_history
  register_error_handler
  dashboard.noisy = options[:verbose] == true
  worker = ::Ruote::Worker.new(dashboard.context)
  if options[:join] == true
    worker.run
  else
    worker.run_in_thread
  end
  worker
end

.storageObject



131
132
133
# File 'lib/bumbleworks/ruote.rb', line 131

def storage
  @storage ||= Bumbleworks.storage_adapter.new_storage(Bumbleworks.storage)
end