Class: Bumbleworks::Worker

Inherits:
Ruote::Worker
  • Object
show all
Defined in:
lib/bumbleworks/worker.rb

Defined Under Namespace

Classes: Info, WorkerStateNotChanged

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, &block) ⇒ Worker

Returns a new instance of Worker.



92
93
94
95
96
97
98
99
# File 'lib/bumbleworks/worker.rb', line 92

def initialize(*args, &block)
  super
  @id = SecureRandom.uuid
  if @info
    @info = Info.new(self)
    save_info
  end
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/bumbleworks/worker.rb', line 6

def id
  @id
end

Class Method Details

.change_worker_state(new_state, options = {}) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/bumbleworks/worker.rb', line 62

def change_worker_state(new_state, options = {})
  with_worker_state_enabled do
    Bumbleworks.dashboard.worker_state = new_state
    Bumbleworks::Support.wait_until(options) do
      worker_states.values.all? { |state| state == new_state }
    end
  end
  return true
rescue Bumbleworks::Support::WaitTimeout
  raise WorkerStateNotChanged, "Worker states: #{worker_states.inspect}"
end

.forget_worker(id_to_delete) ⇒ Object



39
40
41
42
43
# File 'lib/bumbleworks/worker.rb', line 39

def forget_worker(id_to_delete)
  purge_worker_info do |id, info|
    id == id_to_delete
  end
end

.infoObject



9
10
11
# File 'lib/bumbleworks/worker.rb', line 9

def info
  Bumbleworks.dashboard.worker_info || {}
end

.pause_all(options = {}) ⇒ Object



21
22
23
# File 'lib/bumbleworks/worker.rb', line 21

def pause_all(options = {})
  change_worker_state('paused', options)
end

.purge_stale_worker_infoObject



45
46
47
48
49
# File 'lib/bumbleworks/worker.rb', line 45

def purge_stale_worker_info
  purge_worker_info do |id, info|
    info['state'].nil? || info['state'] == 'stopped'
  end
end

.purge_worker_info(&block) ⇒ Object



51
52
53
54
55
56
57
58
59
60
# File 'lib/bumbleworks/worker.rb', line 51

def purge_worker_info(&block)
  doc = Bumbleworks.dashboard.storage.get('variables', 'workers')
  return unless doc
  doc['workers'] = doc['workers'].reject { |id, info|
    block.call(id, info)
  }
  result = Bumbleworks.dashboard.storage.put(doc)
  purge_stale_worker_info if result
  info
end

.refresh_worker_info(options = {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
# File 'lib/bumbleworks/worker.rb', line 74

def refresh_worker_info(options = {})
  with_worker_state_enabled do
    Bumbleworks::Support.wait_until(options) do
      info.all? { |id, worker_info|
        worker_info['state'] == 'stopped' ||
          Time.parse(worker_info['put_at']) > Time.now - 1
      }
    end
  end
end

.shutdown_all(options = {}) ⇒ Object



13
14
15
16
17
18
19
# File 'lib/bumbleworks/worker.rb', line 13

def shutdown_all(options = {})
  # First, send all running workers a message to stop
  change_worker_state('stopped', options)
  # Now ensure that future started workers will be started
  # in "running" mode instead of automatically stopped
  change_worker_state('running', options)
end

.unpause_all(options = {}) ⇒ Object



25
26
27
# File 'lib/bumbleworks/worker.rb', line 25

def unpause_all(options = {})
  change_worker_state('running', options)
end

.with_worker_state_enabledObject



85
86
87
88
89
# File 'lib/bumbleworks/worker.rb', line 85

def with_worker_state_enabled
  Bumbleworks.dashboard.context['worker_state_enabled'] = true
  yield
  Bumbleworks.dashboard.context['worker_state_enabled'] = false
end

.worker_statesObject



29
30
31
32
33
34
35
36
37
# File 'lib/bumbleworks/worker.rb', line 29

def worker_states
  info.inject({}) { |hsh, info|
    id, state = info[0], info[1]['state']
    if state && state != 'stopped'
      hsh[id] = state
    end
    hsh
  }
end

Instance Method Details

#determine_stateObject



110
111
112
113
114
115
# File 'lib/bumbleworks/worker.rb', line 110

def determine_state
  if @context['worker_state_enabled']
    super
    save_info
  end
end

#infoObject



117
118
119
# File 'lib/bumbleworks/worker.rb', line 117

def info
  self.class.info[id]
end

#save_infoObject



101
102
103
# File 'lib/bumbleworks/worker.rb', line 101

def save_info
  @info.save if @info
end

#shutdownObject



105
106
107
108
# File 'lib/bumbleworks/worker.rb', line 105

def shutdown
  super
  save_info
end