Class: Chef::Expander::VNodeSupervisor

Inherits:
Object
  • Object
show all
Extended by:
Loggable
Includes:
Loggable
Defined in:
lib/chef/expander/vnode_supervisor.rb

Constant Summary collapse

COULD_NOT_CONNECT =
/Could not connect to server/.freeze

Constants included from Loggable

Loggable::LOGGER

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

log

Constructor Details

#initializeVNodeSupervisor

Returns a new instance of VNodeSupervisor.



146
147
148
149
150
151
# File 'lib/chef/expander/vnode_supervisor.rb', line 146

def initialize
  @vnodes = {}
  @vnode_table = VNodeTable.new(self)
  @local_node  = Node.local_node
  @queue_name, @guid = nil, nil
end

Instance Attribute Details

#local_nodeObject (readonly)

Returns the value of attribute local_node.



144
145
146
# File 'lib/chef/expander/vnode_supervisor.rb', line 144

def local_node
  @local_node
end

#vnode_tableObject (readonly)

Returns the value of attribute vnode_table.



142
143
144
# File 'lib/chef/expander/vnode_supervisor.rb', line 142

def vnode_table
  @vnode_table
end

Class Method Details

.await_parent_deathObject



64
65
66
67
68
69
70
71
# File 'lib/chef/expander/vnode_supervisor.rb', line 64

def self.await_parent_death
  @awaiting_parent_death = EM.add_periodic_timer(1) do
    unless Process.ppid == @original_ppid
      @awaiting_parent_death.cancel
      stop_immediately("master process death")
    end
  end
end

.startObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/chef/expander/vnode_supervisor.rb', line 73

def self.start
  @vnode_supervisor = new
  trap_signals

  Expander.init_config(ARGV)

  log.info("Chef Search Expander #{Expander.version} starting up.")

  begin
    AMQP.start(Expander.config.amqp_config) do
      start_consumers
    end
  rescue AMQP::Error => e
    if e.message =~ COULD_NOT_CONNECT
      log.error { "Could not connect to rabbitmq. Make sure it is running and correctly configured." }
      log.error { e.message }

      AMQP.hard_reset!

      sleep 5
      retry
    else
      raise
    end
  end
end

.start_cluster_workerObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/chef/expander/vnode_supervisor.rb', line 49

def self.start_cluster_worker
  @vnode_supervisor = new
  @original_ppid = Process.ppid
  trap_signals

  vnodes = Expander.config.vnode_numbers

  $0 = "chef-expander#{Expander.config.ps_tag} worker ##{Expander.config.index} (vnodes #{vnodes.min}-#{vnodes.max})"
  
  AMQP.start(Expander.config.amqp_config) do
    start_consumers
    await_parent_death
  end
end

.start_consumersObject



100
101
102
103
104
105
106
107
# File 'lib/chef/expander/vnode_supervisor.rb', line 100

def self.start_consumers
  log.debug { "Setting prefetch count to 1"}
  MQ.prefetch(1)

  vnodes = Expander.config.vnode_numbers
  log.info("Starting Consumers for vnodes #{vnodes.min}-#{vnodes.max}")
  @vnode_supervisor.start(vnodes)
end

.stop_gracefully(signal) ⇒ Object



123
124
125
126
127
# File 'lib/chef/expander/vnode_supervisor.rb', line 123

def self.stop_gracefully(signal)
  log.info { "Initiating graceful shutdown on signal (#{signal})" }
  @vnode_supervisor.stop
  wait_for_http_requests_to_complete
end

.stop_immediately(signal) ⇒ Object



114
115
116
117
118
119
120
121
# File 'lib/chef/expander/vnode_supervisor.rb', line 114

def self.stop_immediately(signal)
  log.info { "Initiating immediate shutdown on signal (#{signal})" }
  @vnode_supervisor.stop
  EM.add_timer(1) do
    AMQP.stop
    EM.stop
  end
end

.trap_signalsObject



109
110
111
112
# File 'lib/chef/expander/vnode_supervisor.rb', line 109

def self.trap_signals
  Kernel.trap(:INT)  { stop_immediately(:INT) }
  Kernel.trap(:TERM) { stop_gracefully(:TERM) }
end

.wait_for_http_requests_to_completeObject



129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/chef/expander/vnode_supervisor.rb', line 129

def self.wait_for_http_requests_to_complete
  if Expander::Solrizer.http_requests_active?
    log.info { "waiting for in progress HTTP Requests to complete"}
    EM.add_timer(1) do
      wait_for_http_requests_to_complete
    end
  else
    log.info { "HTTP requests completed, shutting down"}
    AMQP.stop
    EM.stop
  end
end

Instance Method Details

#parse_symbolic(message) ⇒ Object



259
260
261
# File 'lib/chef/expander/vnode_supervisor.rb', line 259

def parse_symbolic(message)
  Yajl::Parser.new(:symbolize_keys => true).parse(message)
end

#process_control_message(message) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/chef/expander/vnode_supervisor.rb', line 199

def process_control_message(message)
  control_message = parse_symbolic(message)
  case control_message[:action]
  when "claim_vnode"
    spawn_vnode(control_message[:vnode_id])
  when "recover_vnode"
    recover_vnode(control_message[:vnode_id])
  when "release_vnodes"
    raise "todo"
    release_vnode()
  when "update_vnode_table"
    @vnode_table.update_table(control_message[:data])
  when "vnode_table_publish"
    publish_vnode_table
  when "status"
    publish_status_to(control_message[:rsvp])
  when "set_log_level"
    set_log_level(control_message[:level], control_message[:rsvp])
  else
    log.error { "invalid control message #{control_message.inspect}" }
  end
rescue Exception => e
  log.error { "Error processing a control message."}
  log.error { "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}" }
end

#publish_status_to(return_queue) ⇒ Object



237
238
239
240
241
# File 'lib/chef/expander/vnode_supervisor.rb', line 237

def publish_status_to(return_queue)
  status_update = @local_node.to_hash
  status_update[:vnodes] = vnodes
  MQ.queue(return_queue).publish(Yajl::Encoder.encode(status_update))
end

#publish_vnode_tableObject



230
231
232
233
234
235
# File 'lib/chef/expander/vnode_supervisor.rb', line 230

def publish_vnode_table
  status_update = @local_node.to_hash
  status_update[:vnodes] = vnodes
  status_update[:update] = :add
  @local_node.broadcast_message(Yajl::Encoder.encode({:action => :update_vnode_table, :data => status_update}))
end

#recover_vnode(vnode_id) ⇒ Object



250
251
252
253
254
255
256
257
# File 'lib/chef/expander/vnode_supervisor.rb', line 250

def recover_vnode(vnode_id)
  if @vnode_table.local_node_is_leader?
    log.debug { "Recovering vnode: #{vnode_id}" }
    @local_node.shared_message(Yajl::Encoder.encode({:action => :claim_vnode, :vnode_id => vnode_id}))
  else
    log.debug { "Ignoring :recover_vnode message because this node is not the leader" }
  end
end

#release_vnodeObject



195
196
197
# File 'lib/chef/expander/vnode_supervisor.rb', line 195

def release_vnode
  # TODO
end

#set_log_level(level, rsvp_to) ⇒ Object



243
244
245
246
247
248
# File 'lib/chef/expander/vnode_supervisor.rb', line 243

def set_log_level(level, rsvp_to)
  log.info { "setting log level to #{level} due to command from #{rsvp_to}" }
  new_log_level = (Expander.config.log_level = level.to_sym)
  reply = {:level => new_log_level, :node => @local_node.to_hash}
  MQ.queue(rsvp_to).publish(Yajl::Encoder.encode(reply))
end

#spawn_vnode(vnode_number) ⇒ Object



191
192
193
# File 'lib/chef/expander/vnode_supervisor.rb', line 191

def spawn_vnode(vnode_number)
  VNode.new(vnode_number, self).start
end

#start(vnode_ids) ⇒ Object



153
154
155
156
157
158
159
160
161
# File 'lib/chef/expander/vnode_supervisor.rb', line 153

def start(vnode_ids)
  @local_node.start do |message|
    process_control_message(message)
  end

  #start_vnode_table_publisher

  Array(vnode_ids).each { |vnode_id| spawn_vnode(vnode_id) }
end

#start_vnode_table_publisherObject



226
227
228
# File 'lib/chef/expander/vnode_supervisor.rb', line 226

def start_vnode_table_publisher
  @vnode_table_publisher = EM.add_periodic_timer(10) { publish_vnode_table }
end

#stopObject



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/chef/expander/vnode_supervisor.rb', line 163

def stop
  @local_node.stop

  #log.debug { "stopping vnode table updater" }
  #@vnode_table_publisher.cancel

  log.info { "Stopping VNode queue subscribers"}
  @vnodes.each do |vnode_number, vnode|
    log.debug { "Stopping consumer on VNode #{vnode_number}"}
    vnode.stop
  end
  
end

#vnode_added(vnode) ⇒ Object



177
178
179
180
# File 'lib/chef/expander/vnode_supervisor.rb', line 177

def vnode_added(vnode)
  log.debug { "vnode #{vnode.vnode_number} registered with supervisor" }
  @vnodes[vnode.vnode_number.to_i] = vnode
end

#vnode_removed(vnode) ⇒ Object



182
183
184
185
# File 'lib/chef/expander/vnode_supervisor.rb', line 182

def vnode_removed(vnode)
  log.debug { "vnode #{vnode.vnode_number} unregistered from supervisor" }
  @vnodes.delete(vnode.vnode_number.to_i)
end

#vnodesObject



187
188
189
# File 'lib/chef/expander/vnode_supervisor.rb', line 187

def vnodes
  @vnodes.keys.sort
end