Class: Chef::Expander::VNode

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/chef/expander/vnode.rb

Constant Summary

Constants included from Loggable

Loggable::LOGGER

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log

Constructor Details

#initialize(vnode_number, supervisor, opts = {}) ⇒ VNode

Returns a new instance of VNode.



37
38
39
40
41
42
43
# File 'lib/chef/expander/vnode.rb', line 37

def initialize(vnode_number, supervisor, opts={})
  @vnode_number = vnode_number.to_i
  @supervisor   = supervisor
  @queue    = nil
  @stopped  = false
  @supervise_interval = opts[:supervise_interval] || 30
end

Instance Attribute Details

#supervise_intervalObject (readonly)

Returns the value of attribute supervise_interval.



35
36
37
# File 'lib/chef/expander/vnode.rb', line 35

def supervise_interval
  @supervise_interval
end

#vnode_numberObject (readonly)

Returns the value of attribute vnode_number.



33
34
35
# File 'lib/chef/expander/vnode.rb', line 33

def vnode_number
  @vnode_number
end

Instance Method Details

#abort_on_multiple_subscribeObject



69
70
71
72
73
74
75
76
# File 'lib/chef/expander/vnode.rb', line 69

def abort_on_multiple_subscribe
  queue.status do |message_count, subscriber_count|
    if subscriber_count.to_i > 1
      log.error { "Detected extra consumers (#{subscriber_count} total) on queue #{queue_name}, cancelling subscription" }
      stop
    end
  end
end

#control_queue_nameObject



100
101
102
# File 'lib/chef/expander/vnode.rb', line 100

def control_queue_name
  "#{queue_name}-control"
end

#queueObject



89
90
91
92
93
94
# File 'lib/chef/expander/vnode.rb', line 89

def queue
  @queue ||= begin
    log.debug { "declaring queue #{queue_name}" }
    MQ.queue(queue_name, :passive => false, :durable => true)
  end
end

#queue_nameObject



96
97
98
# File 'lib/chef/expander/vnode.rb', line 96

def queue_name
  "vnode-#{@vnode_number}"
end

#startObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/chef/expander/vnode.rb', line 45

def start
  @supervisor.vnode_added(self)

  subscription_confirmed = Proc.new do
    abort_on_multiple_subscribe
    supervise_consumer_count
  end

  queue.subscribe(:ack => true, :confirm => subscription_confirmed) do |headers, payload|
    log.debug {"got #{payload} size(#{payload.size} bytes) on queue #{queue_name}"}
    solrizer = Solrizer.new(payload) { headers.ack }
    solrizer.run
  end

rescue MQ::Error => e
  log.error {"Failed to start subscriber on #{queue_name} #{e.class.name}: #{e.message}"}
end

#stopObject



78
79
80
81
82
83
# File 'lib/chef/expander/vnode.rb', line 78

def stop
  log.debug {"Cancelling subscription on queue #{queue_name.inspect}"}
  queue.unsubscribe if queue.subscribed?
  @supervisor.vnode_removed(self)
  @stopped = true
end

#stopped?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/chef/expander/vnode.rb', line 85

def stopped?
  @stopped
end

#supervise_consumer_countObject



63
64
65
66
67
# File 'lib/chef/expander/vnode.rb', line 63

def supervise_consumer_count
  EM.add_periodic_timer(supervise_interval) do
    abort_on_multiple_subscribe
  end
end