Class: CorosyncCommander

Inherits:
Object
  • Object
show all
Defined in:
lib/corosync_commander.rb,
lib/corosync_commander/execution.rb,
lib/corosync_commander/execution/message.rb

Overview

This provides a simplified interface into Corosync::CPG. The main use case is for sending commands to a remote server, and waiting for the responses.

This library takes care of:

  • Ensuring a consistent message format.

  • Sending messages to all, or just specific nodes.

  • Invoking the appropriate callback (and passing parameters) based on the command sent.

  • Resonding with the return value of the callback.

  • Handling exceptions and sending them back to the sender.

  • Knowing exactly how many responses should be coming back.

IMPORTANT: Will not work without tuning ruby.

You cannot use this with MRI Ruby older than 2.0. Even with 2.0 you must tune ruby. This is because Corosync CPG (as of 1.4.3) allocates a 1mb buffer on the stack. Ruby 2.0 only allocates a 512kb stack for threads. This gem uses a thread for handling incoming messages. Thus if you try to use older ruby you will get segfaults.

Ruby 2.0 allows increasing the thread stack size. You can do this with the RUBY_THREAD_MACHINE_STACK_SIZE environment variable. The advised value to set is 1.5mb.

RUBY_THREAD_MACHINE_STACK_SIZE=1572864 ruby yourscript.rb

Examples:

cc = CorosyncCommander.new
cc.commands.register('shell command') do |sender, shellcmd|
  %x{#{shellcmd}}
end
cc.join('my group')

exe = cc.execute([], 'shell command', 'hostname')

enum = exe.to_enum
hostnames = []
begin
  enum.each do |sender, response|
    hostnames << response
  end
rescue CorosyncCommander::RemoteException => e
  puts "Caught remote exception: #{e}"
  retry
end

puts "Hostnames: #{hostnames.join(' ')}"

Defined Under Namespace

Modules: RemoteException Classes: CallbackList, Execution

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group_name = nil) ⇒ CorosyncCommander

Creates a new instance and connects to CPG. If a group name is provided, it will join that group. Otherwise it will only connect. This is so that you can establish the command callbacks and avoid NotImplementedError exceptions

Parameters:

  • group_name (String) (defaults to: nil)

    Name of the group to join



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/corosync_commander.rb', line 61

def initialize(group_name = nil)
	@cpg = Corosync::CPG.new
	@cpg.on_message {|*args| cpg_message(*args)}
	@cpg.on_confchg {|*args| cpg_confchg(*args)}
	@cpg.connect
	@cpg.fd.close_on_exec = true

	@quorum = Corosync::Quorum.new
	@quorum.on_notify {|*args| quorum_notify(*args)}
	@quorum.connect
	@quorum.fd.close_on_exec = true

	@cpg_members = nil

	@leader_pool = []
	@leader_pool.extend(Sync_m)

	# we can either share the msgid counter across all threads, or have a msgid counter on each thread and send the thread ID with each message. I prefer the former
	@next_execution_id = 0
	@next_execution_id_mutex = Mutex.new

	@execution_queues = {}
	@execution_queues.extend(Sync_m)

	@command_callbacks = CorosyncCommander::CallbackList.new

	if RUBY_ENGINE == 'ruby' and (Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.0.0') or ENV['RUBY_THREAD_MACHINE_STACK_SIZE'].to_i < 1572864) then
		abort "MRI Ruby must be >= 2.0 and RUBY_THREAD_MACHINE_STACK_SIZE must be > 1572864"
	end

	join(group_name) if group_name
end

Instance Attribute Details

#commandsCorosyncCommander::CallbackList (readonly)

Returns List of command callbacks.

Returns:



307
308
309
# File 'lib/corosync_commander.rb', line 307

def commands
	@command_callbacks
end

#cpgObject (readonly)

Returns the value of attribute cpg.



51
52
53
# File 'lib/corosync_commander.rb', line 51

def cpg
  @cpg
end

#execution_queuesObject (readonly)

Returns the value of attribute execution_queues.



53
54
55
# File 'lib/corosync_commander.rb', line 53

def execution_queues
  @execution_queues
end

Instance Method Details

#execute(recipients, command, *args) ⇒ CorosyncCommander::Execution

Execute a remote command.

Parameters:

  • recipients (Array<Corosync::CPG::Member>)

    List of recipients to send to, or an empty array to broadcast to all members of the group.

  • command (String)

    The name of the remote command to execute. If no such command exists on the remote node a NotImplementedError exception will be raised when enumerating the results.

  • args

    Any further arguments will be passed to the command callback on the remote host.

Returns:



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/corosync_commander.rb', line 316

def execute(recipients, command, *args)
	execution = CorosyncCommander::Execution.new(self, next_execution_id, recipients, command, args)

	message = CorosyncCommander::Execution::Message.new(:recipients => recipients, :execution_id => execution.id, :type => 'command', :content => [command, args])

	@execution_queues.synchronize(:EX) do
		@execution_queues[execution.id] = execution.queue
	end
	# Technique stolen from http://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/
	#TODO We definitately need a spec test to validate the execution object gets garbage collected
	ObjectSpace.define_finalizer(execution, execution_queue_finalizer(execution.id))

	@cpg.send(message)

	execution
end

#join(group_name) ⇒ void

This method returns an undefined value.

Joins the specified group. This is provided separate from initialization so that callbacks can be registered before joining the group so that you wont get NotImplementedError exceptions

Parameters:

  • group_name (String)

    Name of group to join



122
123
124
125
126
# File 'lib/corosync_commander.rb', line 122

def join(group_name)
	start unless @dispatch_thread

	@cpg.join(group_name)
end

#leader?Boolean

Indicates whether we are the group leader. If we are the leader, it means that we are the oldest member of the group. This is slightly different than just calling ‘leader_position == 0` in that if it is -1 (meaning we havent received the CPG confchg callback yet), we wait for the CPG join to complete.

Returns:

  • (Boolean)


355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/corosync_commander.rb', line 355

def leader?

	# The way leadership works is that we record the members that were present when we joined the group in @leader_pool. Each time a node leaves the group, we remove them from @leader_pool. Once we become the only member in @leader_pool, we are the leader.
	# Now in the event that the cluster splits, this becomes complicated. Each side will see the members of the other side leaving. So each side will end up with their own leader. Since they can't talk to eachother, having a leader in each group is perfectly fine. However when the 2 sides re-join, each side will see the members of the other side joining as new nodes, and both leaders will remain as leaders.
	# We solve this by using the quorum status. When we go from inquorate to quorate, we give up our position. We send a 'leader reset' command to the cluster which tells everyone to remove us from their @leader_pool. When we receive the message ourself, we set @leader_pool to the group members at that moment.
	# It doesn't matter if multiple members end up doing a 'leader reset' at the same time. It basically simulates the node leaving and then joining. Whoever performs the action first will move to the front. It will capture @leader_pool as the current members when it receives it's own message, and as the other resets come in, it will remove those members. Leaving itself in front of the ones that just joined (and reset after). But it will still remain after all the members that didn't do a reset.

	position = nil
	loop do
		position = leader_position
		break if position != -1
		Thread.pass # let the dispatch thread run so we can get our join message
		# This isn't ideal as if the dispatch thread doesn't immediatly complete the join, we start spinning.
		# But the only other way is to use condition variables, which combined with Sync_m, would just be really messy and stupidly complex (and I don't want to go to a plain mutex and lose the ability to use shared locks).
	end
	position == 0
end

#leader_positionInteger

Gets the member’s position in the leadership queue. The leadership position is simply how many nodes currently in the group were in the group before we joined.

Returns:

  • (Integer)


345
346
347
348
349
# File 'lib/corosync_commander.rb', line 345

def leader_position
	@leader_pool.synchronize(:SH) do
		@leader_pool.size - 1
	end
end

#leavevoid

This method returns an undefined value.

Leave the active CPG group. Will not stop quorum notifications. If you wish to stop quorum as well you should use #stop instead.



131
132
133
# File 'lib/corosync_commander.rb', line 131

def leave
	@cpg.leave
end

#membersArray<Corosync::CPG::Member>

List of current members

Returns:

  • (Array<Corosync::CPG::Member>)

    List of members currently in the group



381
382
383
# File 'lib/corosync_commander.rb', line 381

def members
	@cpg_members
end

#on_confchg {|member_list, left_list, join_list| ... } ⇒ Object

Callback to execute when the CPG configuration changes

Yield Parameters:

  • member_list (Array<Corosync::CPG::Member>)

    List of members in group after change

  • left_list (Array<Corosync::CPG::Member>)

    List of members which left the group

  • join_list (Array<Corosync::CPG::Member>)

    List of members which joined the group



281
282
283
# File 'lib/corosync_commander.rb', line 281

def on_confchg(&block)
	@confchg_callback = block
end

#on_quorumchg {|quorate, member_list| ... } ⇒ Object

Callback to execute when the quorum state changes

Yield Parameters:

  • quorate (Boolean)

    Whether cluster is quorate

  • member_list (Array)

    List of node IDs in the cluster after change



301
302
303
# File 'lib/corosync_commander.rb', line 301

def on_quorumchg(&block)
	@quorumchg_callback = block
end

#quorate?Boolean

Indicates whether cluster is quorate.

Returns:

  • (Boolean)


375
376
377
# File 'lib/corosync_commander.rb', line 375

def quorate?
	@quorate
end

#startvoid

This method returns an undefined value.

Starts watching for notifications



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/corosync_commander.rb', line 96

def start
	@quorum.start

	@dispatch_thread = Thread.new do
		begin
			loop do
				select_ready = select([@cpg.fd, @quorum.fd], [], [])
				if select_ready[0].include?(@quorum.fd) then
					@quorum.dispatch
				end
				if select_ready[0].include?(@cpg.fd) then
					@cpg.dispatch
				end
			end
		rescue Exception => e
			# something happened that we don't know how to handle. We need to bail out.
			$stderr.write "Fatal exception: #{e.to_s} (#{e.class})\n#{e.backtrace.join("\n")}\n"
			exit(1)
		end
	end
end

#stopvoid

This method returns an undefined value.

Shuts down the dispatch thread and disconnects CPG



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/corosync_commander.rb', line 137

def stop
	@dispatch_thread.kill if !@dispatch_thread.nil?
	@dispatch_thread = nil

	@cpg.close if !@cpg.nil?
	@cpg = nil
	@cpg_members = nil

	@quorum.finalize if !@quorum.nil?
	@quorum = nil
end