Class: Corosync::CPG
- Inherits:
-
Object
- Object
- Corosync::CPG
- Defined in:
- lib/corosync/cpg.rb,
lib/corosync/cpg/member.rb,
lib/corosync/cpg/member_list.rb
Overview
CPG is used for sending messages between processes (usually on multiple servers). The benefits offered by CPG over normal IPC is that message order is guaranteed. If you have 3 nodes, and both node 1 and node 2 send a message at the exact same time, all 3 nodes will receive the messages in the same order. One of the key details in this is that a node will also receive it’s own message. You can also be notified whenever nodes join or leave the group. The order of these messages is preserved as well.
This is all done through callbacks. You define a block of code to execute, and whenever a message is received, it is passed to that block.
After registering the callbacks, you call #dispatch to check for any pending messages, upon which the appropriate callbacks will be executed.
The simplest usage of this library is to call ‘Corosync::CPG.new(’groupname’)‘. This will connect to CPG and join the specified group.
Threading notice
With MRI Ruby 1.9.3 and older, you cannot call #dispatch from within a thread. Attempting to do so will result in a segfault.
This is because the Corosync library allocates a very large buffer on the stack, and these versions of Ruby do not allocate enough memory to the thread stack.
With MRI Ruby 2.0.0 the behavior is a bit different. There is a workaround, but without it, calling #dispatch will result in the thread hanging. The workaround is that you you can pass the environment variable RUBY_THREAD_MACHINE_STACK_SIZE to increase the size of the thread stack. The recommended size is 1572864.
Defined Under Namespace
Classes: Member, MemberList
Instance Attribute Summary collapse
-
#fd ⇒ IO
readonly
The IO object containing the file descriptor events and messages come across.
-
#group ⇒ String
readonly
Name of the currently joined group.
-
#nodeid ⇒ Integer
readonly
The node ID of ourself.
Instance Method Summary collapse
-
#connect ⇒ void
Connect to the CPG service.
-
#dispatch(timeout = -1)) ⇒ Boolean
Checks for a single pending events and triggers the appropriate callback if found.
-
#finalize ⇒ void
(also: #close)
Shuts down the connection to the CPG service.
-
#initialize(group = nil) ⇒ void
constructor
Creates a new CPG connection to the CPG service.
-
#join(name) ⇒ void
Join the specified closed process group.
-
#leave ⇒ void
Leave the current closed process group.
-
#member ⇒ Corosync::CPG::Member
Returns the member object describing ourself.
-
#members ⇒ Corosync::CPG::MemberList
Gets a list of members currently in the group.
-
#on_confchg(&block) {|member_list, left_list, joined_list| ... } ⇒ void
Proc to call when a node joins/leaves the group.
-
#on_message(&block) {|member, message| ... } ⇒ void
Proc to call when a message is received.
-
#on_totem_confchg(&block) {|ring_id, member_list| ... } ⇒ void
Proc to call when a node joins/leaves the cluster.
-
#send(messages) ⇒ void
Send one or more messages to the group.
Constructor Details
#initialize(group = nil) ⇒ void
Creates a new CPG connection to the CPG service.
You can spawn as many connections as you like in a single process, but each connection can only belong to a single group.
If you get an ERR_LIBRARY error, corosync is likely not running.
If you get an EACCESS error, you’re likely not running as root (or havent set a ‘uidgid` directive in the config).
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/corosync/cpg.rb', line 54 def initialize(group = nil) # The model has to be preserved so it doesn't get garbage collected. # Apparently CPG needs to reference it long after initialization :-( # (cpg.c:423) @model = Corosync::CpgModelV1DataT.new @model[:cpg_deliver_fn] = self.method(:callback_deliver) @model[:cpg_confchg_fn] = self.method(:callback_confchg) @model[:cpg_totem_confchg_fn] = self.method(:callback_totem_confchg) @group = nil @fd = nil @handle = nil join group if group end |
Instance Attribute Details
#fd ⇒ IO (readonly)
The IO object containing the file descriptor events and messages come across. You can use this to check for activity, but do not read anything from it.
40 41 42 |
# File 'lib/corosync/cpg.rb', line 40 def fd @fd end |
#group ⇒ String (readonly)
Name of the currently joined group
44 45 46 |
# File 'lib/corosync/cpg.rb', line 44 def group @group end |
Instance Method Details
#connect ⇒ void
This method returns an undefined value.
Connect to the CPG service.
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/corosync/cpg.rb', line 72 def connect handle_ptr = FFI::MemoryPointer.new(Corosync.find_type(:cpg_handle_t)) model_cast = Corosync::CpgModelDataT.new(@model.to_ptr) Corosync.cs_send(:cpg_model_initialize, handle_ptr, Corosync::CPG_MODEL_V1, model_cast, nil) @handle = handle_ptr.read_uint64 fd_ptr = FFI::MemoryPointer.new(:int) Corosync.cs_send(:cpg_fd_get, @handle, fd_ptr) @fd = IO.new(fd_ptr.read_int) end |
#dispatch(timeout = -1)) ⇒ Boolean
Checks for a single pending events and triggers the appropriate callback if found.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/corosync/cpg.rb', line 135 def dispatch(timeout = -1) if !timeout != 0 then timeout = nil if timeout == -1 select([@fd], [], [], timeout) end begin Corosync.cs_send!(:cpg_dispatch, @handle, Corosync::CS_DISPATCH_ONE_NONBLOCKING) rescue Corosync::TryAgainError => e raise e if e.depth > 1 # this exception is from a nested corosync function, not our cpg_dispatch we just called return false end return true end |
#finalize ⇒ void Also known as: close
This method returns an undefined value.
Shuts down the connection to the CPG service.
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/corosync/cpg.rb', line 85 def finalize return if @handle.nil? Corosync.cs_send(:cpg_finalize, @handle) @group = nil @fd = nil @model = nil @handle = nil true end |
#join(name) ⇒ void
This method returns an undefined value.
Join the specified closed process group.
102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/corosync/cpg.rb', line 102 def join(name) connect if @handle.nil? cpg_name = Corosync::CpgName.new cpg_name[:value] = name cpg_name[:length] = name.size Corosync.cs_send(:cpg_join, @handle, cpg_name) @group = name self end |
#leave ⇒ void
This method returns an undefined value.
Leave the current closed process group.
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/corosync/cpg.rb', line 117 def leave() return if !@group cpg_name = Corosync::CpgName.new cpg_name[:value] = @group cpg_name[:length] = @group.size # we can't join multiple groups, so I dont know why corosync requires you to specify the group name Corosync.cs_send(:cpg_leave, @handle, cpg_name) @group = nil end |
#member ⇒ Corosync::CPG::Member
Returns the member object describing ourself.
257 258 259 |
# File 'lib/corosync/cpg.rb', line 257 def member Corosync::CPG::Member.new(self.nodeid, $$) end |
#members ⇒ Corosync::CPG::MemberList
Gets a list of members currently in the group
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/corosync/cpg.rb', line 227 def members members = Corosync::CPG::MemberList.new cpg_name = Corosync::CpgName.new cpg_name[:value] = @group cpg_name[:length] = @group.size iteration_handle_ptr = FFI::MemoryPointer.new(Corosync.find_type(:cpg_iteration_handle_t)) Corosync.cs_send(:cpg_iteration_initialize, @handle, Corosync::CPG_ITERATION_ONE_GROUP, cpg_name, iteration_handle_ptr) iteration_handle = iteration_handle_ptr.read_uint64 begin iteration_description = Corosync::CpgIterationDescriptionT.new begin loop do Corosync.cs_send(:cpg_iteration_next, iteration_handle, iteration_description) members << Corosync::CPG::Member.new(iteration_description) end rescue Corosync::NoSectionsError # signals end of iteration end ensure Corosync.cs_send(:cpg_iteration_finalize, iteration_handle) end members end |
#on_confchg(&block) {|member_list, left_list, joined_list| ... } ⇒ void
This method returns an undefined value.
Proc to call when a node joins/leaves the group. If this is set before calling #join, it will be called when joining the group.
173 174 175 |
# File 'lib/corosync/cpg.rb', line 173 def on_confchg(&block) @callback_confchg = block end |
#on_message(&block) {|member, message| ... } ⇒ void
This method returns an undefined value.
Proc to call when a message is received.
156 157 158 |
# File 'lib/corosync/cpg.rb', line 156 def (&block) @callback_deliver = block end |
#on_totem_confchg(&block) {|ring_id, member_list| ... } ⇒ void
204 205 206 |
# File 'lib/corosync/cpg.rb', line 204 def on_totem_confchg(&block) @callback_totem_confchg = block end |
#send(messages) ⇒ void
This method returns an undefined value.
Send one or more messages to the group. Sending multiple messages through a single call to #send ensures that the messages will be delivered consecutively without another message in the middle.
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/corosync/cpg.rb', line 265 def send() = [] if !.is_a?(Array) iovec_list_p = FFI::MemoryPointer.new(Corosync::Iovec, .size) iovec_list = .size.times.collect do |i| iovec = Corosync::Iovec.new(iovec_list_p + i * Corosync::Iovec.size) = [i].to_s iovec[:iov_base] = FFI::MemoryPointer.from_string() iovec[:iov_len] = .size iovec end iovec_len = .size Corosync.cs_send(:cpg_mcast_joined, @handle, Corosync::CPG_TYPE_AGREED, iovec_list_p, iovec_len) true end |