Class: Bumbleworks::Ruote

Inherits:
Object
  • Object
show all
Defined in:
lib/bumbleworks/ruote.rb

Defined Under Namespace

Classes: CancelTimeout, KillTimeout

Class Method Summary collapse

Class Method Details

.cancel_all_processes!(options = {}) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/bumbleworks/ruote.rb', line 67

def cancel_all_processes!(options = {})
  options[:timeout] ||= 5
  unless options[:method] == :kill
    options[:method] = :cancel
  end

  dashboard.processes.each do |ps|
    dashboard.send(options[:method], ps.wfid)
  end

  start_time = Time.now
  while dashboard.processes.count > 0
    if (Time.now - start_time) > options[:timeout]
      error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
      raise error_type, "Process #{options[:method]} taking too long - #{dashboard.processes.count} processes remain.  Errors: #{dashboard.errors}"
    end
    sleep 0.1
  end
end

.cancel_process!(wfid, options = {}) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/bumbleworks/ruote.rb', line 46

def cancel_process!(wfid, options = {})
  options[:timeout] ||= 5
  unless options[:method] == :kill
    options[:method] = :cancel
  end

  dashboard.send(options[:method], wfid)
  start_time = Time.now
  while dashboard.process(wfid)
    if (Time.now - start_time) > options[:timeout]
      error_type = options[:method] == :cancel ? CancelTimeout : KillTimeout
      raise error_type, "Process #{options[:method]} taking too long - #{dashboard.processes.count} processes remain.  Errors: #{dashboard.errors}"
    end
    sleep 0.1
  end
end

.dashboard(options = {}) ⇒ Object



11
12
13
# File 'lib/bumbleworks/ruote.rb', line 11

def dashboard(options = {})
  @dashboard ||= ::Ruote::Dashboard.new(storage)
end

.kill_all_processes!(options = {}) ⇒ Object



87
88
89
# File 'lib/bumbleworks/ruote.rb', line 87

def kill_all_processes!(options = {})
  cancel_all_processes!(options.merge(:method => :kill))
end

.kill_process!(wfid, options = {}) ⇒ Object



63
64
65
# File 'lib/bumbleworks/ruote.rb', line 63

def kill_process!(wfid, options = {})
  cancel_process!(wfid, options.merge(:method => :kill))
end

.launch(name, *args) ⇒ Object



41
42
43
44
# File 'lib/bumbleworks/ruote.rb', line 41

def launch(name, *args)
  set_catchall_if_needed
  dashboard.launch(dashboard.variables[name], *args)
end

.register_participants(&block) ⇒ Object



91
92
93
94
95
# File 'lib/bumbleworks/ruote.rb', line 91

def register_participants(&block)
  dashboard.register(&block) if block
  set_catchall_if_needed
  dashboard.participant_list
end

.reset!Object



117
118
119
120
121
122
123
124
125
# File 'lib/bumbleworks/ruote.rb', line 117

def reset!
  if @storage
    @storage.purge!
    @storage.shutdown
  end
  @dashboard.shutdown if @dashboard && @dashboard.respond_to?(:shutdown)
  @storage = nil
  @dashboard = nil
end

.set_catchall_if_neededObject



97
98
99
100
101
102
103
104
# File 'lib/bumbleworks/ruote.rb', line 97

def set_catchall_if_needed
  last_participant = dashboard.participant_list.last
  unless last_participant && last_participant.regex == "^.+$" &&
      ["Ruote::StorageParticipant", "Bumbleworks::StorageParticipant"].include?(last_participant.classname)
    catchall = ::Ruote::ParticipantEntry.new(["^.+$", ["Bumbleworks::StorageParticipant", {}]])
    dashboard.participant_list = dashboard.participant_list.push(catchall)
  end
end

.start_worker!(options = {}) ⇒ Object

Start a worker, which will begin polling for messages in the workflow storage. You can run multiple workers if you are using a storage that supports them (such as Sequel or Redis, but not Hash) - they all just have to be connected to the same storage, and be able to instantiate participants in the participant list.

Parameters:

  • options (Hash) (defaults to: {})

    startup options for the worker

Options Hash (options):

  • :verbose (Boolean)

    whether or not to spin up a “noisy” worker, which will output all messages picked up

  • :join (Boolean)

    whether or not to join the worker thread; if false, this method will return, and the worker thread will be disconnected, and killed if the calling process exits.



30
31
32
33
34
35
36
37
38
39
# File 'lib/bumbleworks/ruote.rb', line 30

def start_worker!(options = {})
  dashboard.noisy = options[:verbose] == true
  worker = ::Ruote::Worker.new(dashboard.context)
  if options[:join] == true
    worker.run
  else
    worker.run_in_thread
  end
  worker
end

.storageObject



106
107
108
109
110
111
112
113
114
115
# File 'lib/bumbleworks/ruote.rb', line 106

def storage
  @storage ||= begin
    all_adapters = Bumbleworks.configuration.storage_adapters
    adapter = all_adapters.detect do |adapter|
      adapter.use?(Bumbleworks.storage)
    end
    raise UndefinedSetting, "Storage is missing or not supported.  Supported: #{all_adapters.map(&:display_name).join(', ')}" unless adapter
    adapter.driver.new(Bumbleworks.storage)
  end
end