Class: CorosyncCommander
- Inherits:
-
Object
- Object
- CorosyncCommander
- Defined in:
- lib/corosync_commander.rb,
lib/corosync_commander/execution.rb,
lib/corosync_commander/execution/message.rb
Overview
This provides a simplified interface into Corosync::CPG. The main use case is for sending commands to a remote server, and waiting for the responses.
This library takes care of:
-
Ensuring a consistent message format.
-
Sending messages to all, or just specific nodes.
-
Invoking the appropriate callback (and passing parameters) based on the command sent.
-
Resonding with the return value of the callback.
-
Handling exceptions and sending them back to the sender.
-
Knowing exactly how many responses should be coming back.
IMPORTANT: Will not work without tuning ruby.
You cannot use this with MRI Ruby older than 2.0. Even with 2.0 you must tune ruby. This is because Corosync CPG (as of 1.4.3) allocates a 1mb buffer on the stack. Ruby 2.0 only allocates a 512kb stack for threads. This gem uses a thread for handling incoming messages. Thus if you try to use older ruby you will get segfaults.
Ruby 2.0 allows increasing the thread stack size. You can do this with the RUBY_THREAD_MACHINE_STACK_SIZE environment variable. The advised value to set is 1.5mb.
RUBY_THREAD_MACHINE_STACK_SIZE=1572864 ruby yourscript.rb
Defined Under Namespace
Modules: RemoteException Classes: CallbackList, Execution
Instance Attribute Summary collapse
-
#commands ⇒ CorosyncCommander::CallbackList
readonly
List of command callbacks.
-
#cpg ⇒ Object
readonly
Returns the value of attribute cpg.
-
#execution_queues ⇒ Object
readonly
Returns the value of attribute execution_queues.
Instance Method Summary collapse
-
#execute(recipients, command, *args) ⇒ CorosyncCommander::Execution
Execute a remote command.
-
#initialize(group_name = nil) ⇒ CorosyncCommander
constructor
Creates a new instance and connects to CPG.
-
#join(group_name) ⇒ void
Joins the specified group.
-
#leader? ⇒ Boolean
Indicates whether we are the group leader.
-
#leader_position ⇒ Integer
Gets the member’s position in the leadership queue.
-
#leave ⇒ void
Leave the active CPG group.
-
#members ⇒ Array<Corosync::CPG::Member>
List of current members.
-
#on_confchg {|member_list, left_list, join_list| ... } ⇒ Object
Callback to execute when the CPG configuration changes.
-
#on_quorumchg {|quorate, member_list| ... } ⇒ Object
Callback to execute when the quorum state changes.
-
#quorate? ⇒ Boolean
Indicates whether cluster is quorate.
-
#start ⇒ void
Starts watching for notifications.
-
#stop ⇒ void
Shuts down the dispatch thread and disconnects CPG.
Constructor Details
#initialize(group_name = nil) ⇒ CorosyncCommander
Creates a new instance and connects to CPG. If a group name is provided, it will join that group. Otherwise it will only connect. This is so that you can establish the command callbacks and avoid NotImplementedError exceptions
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/corosync_commander.rb', line 61 def initialize(group_name = nil) @cpg = Corosync::CPG.new @cpg. {|*args| (*args)} @cpg.on_confchg {|*args| cpg_confchg(*args)} @cpg.connect @cpg.fd.close_on_exec = true @quorum = Corosync::Quorum.new @quorum.on_notify {|*args| quorum_notify(*args)} @quorum.connect @quorum.fd.close_on_exec = true @cpg_members = nil @leader_pool = [] @leader_pool.extend(Sync_m) # we can either share the msgid counter across all threads, or have a msgid counter on each thread and send the thread ID with each message. I prefer the former @next_execution_id = 0 @next_execution_id_mutex = Mutex.new @execution_queues = {} @execution_queues.extend(Sync_m) @command_callbacks = CorosyncCommander::CallbackList.new if RUBY_ENGINE == 'ruby' and (Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.0.0') or ENV['RUBY_THREAD_MACHINE_STACK_SIZE'].to_i < 1572864) then abort "MRI Ruby must be >= 2.0 and RUBY_THREAD_MACHINE_STACK_SIZE must be > 1572864" end join(group_name) if group_name end |
Instance Attribute Details
#commands ⇒ CorosyncCommander::CallbackList (readonly)
Returns List of command callbacks.
307 308 309 |
# File 'lib/corosync_commander.rb', line 307 def commands @command_callbacks end |
#cpg ⇒ Object (readonly)
Returns the value of attribute cpg.
51 52 53 |
# File 'lib/corosync_commander.rb', line 51 def cpg @cpg end |
#execution_queues ⇒ Object (readonly)
Returns the value of attribute execution_queues.
53 54 55 |
# File 'lib/corosync_commander.rb', line 53 def execution_queues @execution_queues end |
Instance Method Details
#execute(recipients, command, *args) ⇒ CorosyncCommander::Execution
Execute a remote command.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/corosync_commander.rb', line 316 def execute(recipients, command, *args) execution = CorosyncCommander::Execution.new(self, next_execution_id, recipients, command, args) = CorosyncCommander::Execution::Message.new(:recipients => recipients, :execution_id => execution.id, :type => 'command', :content => [command, args]) @execution_queues.synchronize(:EX) do @execution_queues[execution.id] = execution.queue end # Technique stolen from http://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/ #TODO We definitately need a spec test to validate the execution object gets garbage collected ObjectSpace.define_finalizer(execution, execution_queue_finalizer(execution.id)) @cpg.send() execution end |
#join(group_name) ⇒ void
This method returns an undefined value.
Joins the specified group. This is provided separate from initialization so that callbacks can be registered before joining the group so that you wont get NotImplementedError exceptions
122 123 124 125 126 |
# File 'lib/corosync_commander.rb', line 122 def join(group_name) start unless @dispatch_thread @cpg.join(group_name) end |
#leader? ⇒ Boolean
Indicates whether we are the group leader. If we are the leader, it means that we are the oldest member of the group. This is slightly different than just calling ‘leader_position == 0` in that if it is -1 (meaning we havent received the CPG confchg callback yet), we wait for the CPG join to complete.
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/corosync_commander.rb', line 355 def leader? # The way leadership works is that we record the members that were present when we joined the group in @leader_pool. Each time a node leaves the group, we remove them from @leader_pool. Once we become the only member in @leader_pool, we are the leader. # Now in the event that the cluster splits, this becomes complicated. Each side will see the members of the other side leaving. So each side will end up with their own leader. Since they can't talk to eachother, having a leader in each group is perfectly fine. However when the 2 sides re-join, each side will see the members of the other side joining as new nodes, and both leaders will remain as leaders. # We solve this by using the quorum status. When we go from inquorate to quorate, we give up our position. We send a 'leader reset' command to the cluster which tells everyone to remove us from their @leader_pool. When we receive the message ourself, we set @leader_pool to the group members at that moment. # It doesn't matter if multiple members end up doing a 'leader reset' at the same time. It basically simulates the node leaving and then joining. Whoever performs the action first will move to the front. It will capture @leader_pool as the current members when it receives it's own message, and as the other resets come in, it will remove those members. Leaving itself in front of the ones that just joined (and reset after). But it will still remain after all the members that didn't do a reset. position = nil loop do position = leader_position break if position != -1 Thread.pass # let the dispatch thread run so we can get our join message # This isn't ideal as if the dispatch thread doesn't immediatly complete the join, we start spinning. # But the only other way is to use condition variables, which combined with Sync_m, would just be really messy and stupidly complex (and I don't want to go to a plain mutex and lose the ability to use shared locks). end position == 0 end |
#leader_position ⇒ Integer
Gets the member’s position in the leadership queue. The leadership position is simply how many nodes currently in the group were in the group before we joined.
345 346 347 348 349 |
# File 'lib/corosync_commander.rb', line 345 def leader_position @leader_pool.synchronize(:SH) do @leader_pool.size - 1 end end |
#leave ⇒ void
This method returns an undefined value.
Leave the active CPG group. Will not stop quorum notifications. If you wish to stop quorum as well you should use #stop instead.
131 132 133 |
# File 'lib/corosync_commander.rb', line 131 def leave @cpg.leave end |
#members ⇒ Array<Corosync::CPG::Member>
List of current members
381 382 383 |
# File 'lib/corosync_commander.rb', line 381 def members @cpg_members end |
#on_confchg {|member_list, left_list, join_list| ... } ⇒ Object
Callback to execute when the CPG configuration changes
281 282 283 |
# File 'lib/corosync_commander.rb', line 281 def on_confchg(&block) @confchg_callback = block end |
#on_quorumchg {|quorate, member_list| ... } ⇒ Object
Callback to execute when the quorum state changes
301 302 303 |
# File 'lib/corosync_commander.rb', line 301 def on_quorumchg(&block) @quorumchg_callback = block end |
#quorate? ⇒ Boolean
Indicates whether cluster is quorate.
375 376 377 |
# File 'lib/corosync_commander.rb', line 375 def quorate? @quorate end |
#start ⇒ void
This method returns an undefined value.
Starts watching for notifications
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/corosync_commander.rb', line 96 def start @quorum.start @dispatch_thread = Thread.new do begin loop do select_ready = select([@cpg.fd, @quorum.fd], [], []) if select_ready[0].include?(@quorum.fd) then @quorum.dispatch end if select_ready[0].include?(@cpg.fd) then @cpg.dispatch end end rescue Exception => e # something happened that we don't know how to handle. We need to bail out. $stderr.write "Fatal exception: #{e.to_s} (#{e.class})\n#{e.backtrace.join("\n")}\n" exit(1) end end end |
#stop ⇒ void
This method returns an undefined value.
Shuts down the dispatch thread and disconnects CPG
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/corosync_commander.rb', line 137 def stop @dispatch_thread.kill if !@dispatch_thread.nil? @dispatch_thread = nil @cpg.close if !@cpg.nil? @cpg = nil @cpg_members = nil @quorum.finalize if !@quorum.nil? @quorum = nil end |