Class: Bumbleworks::Worker

Inherits:
Ruote::Worker
  • Object
show all
Defined in:
lib/bumbleworks/worker.rb,
lib/bumbleworks/worker/info.rb,
lib/bumbleworks/worker/proxy.rb

Defined Under Namespace

Classes: Info, Proxy, WorkerStateNotChanged

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, &block) ⇒ Worker

Returns a new instance of Worker.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/bumbleworks/worker.rb', line 86

def initialize(*args, &block)
  super
  @pid = Process.pid
  @id = SecureRandom.uuid
  @launched_at = Time.now

  @ip = Ruote.local_ip
  @hostname = Socket.gethostname
  @system = `uname -a`.strip rescue nil

  if @info
    @info = Info.new(self)
    save_info
  end
end

Instance Attribute Details

#hostnameObject (readonly)

Returns the value of attribute hostname.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def hostname
  @hostname
end

#idObject (readonly)

Returns the value of attribute id.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def id
  @id
end

#ipObject (readonly)

Returns the value of attribute ip.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def ip
  @ip
end

#launched_atObject (readonly)

Returns the value of attribute launched_at.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def launched_at
  @launched_at
end

#pidObject (readonly)

Returns the value of attribute pid.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def pid
  @pid
end

#systemObject (readonly)

Returns the value of attribute system.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def system
  @system
end

Class Method Details

.active_worker_statesObject



31
32
33
34
35
36
37
38
39
# File 'lib/bumbleworks/worker.rb', line 31

def active_worker_states
  info.inject({}) { |hsh, info|
    id, state = info.id, info.state
    unless info.state.nil? || info.in_stopped_state?
      hsh[id] = state
    end
    hsh
  }
end

.change_worker_state(new_state, options = {}) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/bumbleworks/worker.rb', line 41

def change_worker_state(new_state, options = {})
  with_worker_state_enabled do
    Bumbleworks.dashboard.worker_state = new_state
    Bumbleworks::Support.wait_until(options) do
      active_worker_states.values.all? { |state| state == new_state }
    end
  end
  return true
rescue Bumbleworks::Support::WaitTimeout
  raise WorkerStateNotChanged, "Worker states: #{active_worker_states.inspect}"
end

.control_documentObject



69
70
71
72
73
74
75
# File 'lib/bumbleworks/worker.rb', line 69

def control_document
  doc = Bumbleworks.dashboard.storage.get('variables', 'worker_control') || {}
  doc['type'] = 'variables'
  doc['_id'] = 'worker_control'
  doc['workers'] ||= {}
  doc
end

.infoObject



10
11
12
# File 'lib/bumbleworks/worker.rb', line 10

def info
  Bumbleworks::Worker::Info || {}
end

.info_documentObject



77
78
79
80
81
82
83
# File 'lib/bumbleworks/worker.rb', line 77

def info_document
  doc = Bumbleworks.dashboard.storage.get('variables', 'workers') || {}
  doc['type'] = 'variables'
  doc['_id'] = 'workers'
  doc['workers'] ||= {}
  doc
end

.pause_all(options = {}) ⇒ Object



23
24
25
# File 'lib/bumbleworks/worker.rb', line 23

def pause_all(options = {})
  change_worker_state('paused', options)
end

.refresh_worker_info(options = {}) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/bumbleworks/worker.rb', line 53

def refresh_worker_info(options = {})
  with_worker_state_enabled do
    info.each do |worker_info|
      if !worker_info.in_stopped_state? && worker_info.stalling?
        worker_info.record_new_state("stalled")
      end
    end
  end
end

.shutdown_all(options = {}) ⇒ Object



14
15
16
17
18
19
20
21
# File 'lib/bumbleworks/worker.rb', line 14

def shutdown_all(options = {})
  # First, send all running workers a message to stop
  change_worker_state('stopped', options)
ensure
  # Now ensure that future started workers will be started
  # in "running" mode instead of automatically stopped
  change_worker_state('running', options)
end

.unpause_all(options = {}) ⇒ Object



27
28
29
# File 'lib/bumbleworks/worker.rb', line 27

def unpause_all(options = {})
  change_worker_state('running', options)
end

.with_worker_state_enabledObject



63
64
65
66
67
# File 'lib/bumbleworks/worker.rb', line 63

def with_worker_state_enabled
  Bumbleworks.dashboard.context['worker_state_enabled'] = true
  yield
  Bumbleworks.dashboard.context['worker_state_enabled'] = false
end

Instance Method Details

#class_nameObject



102
103
104
# File 'lib/bumbleworks/worker.rb', line 102

def class_name
  self.class.to_s
end

#desired_stateObject



119
120
121
122
123
124
# File 'lib/bumbleworks/worker.rb', line 119

def desired_state
  control_hash = worker_control_variable ||
    @storage.get("variables", "worker") ||
    { "state" => "running" }
  control_hash["state"]
end

#determine_stateObject



126
127
128
129
130
131
132
133
# File 'lib/bumbleworks/worker.rb', line 126

def determine_state
  @state_mutex.synchronize do
    if @state != "stopped" && @context["worker_state_enabled"]
      @state = desired_state
      save_info
    end
  end
end

#infoObject



135
136
137
# File 'lib/bumbleworks/worker.rb', line 135

def info
  self.class.info[id]
end

#save_infoObject



106
107
108
# File 'lib/bumbleworks/worker.rb', line 106

def save_info
  @info.save if @info
end

#shutdownObject



110
111
112
113
# File 'lib/bumbleworks/worker.rb', line 110

def shutdown
  super
  save_info
end

#worker_control_variableObject



115
116
117
# File 'lib/bumbleworks/worker.rb', line 115

def worker_control_variable
  self.class.control_document["workers"][id]
end