Class: Corosync::CPG

Inherits:
Object
  • Object
show all
Defined in:
lib/corosync/cpg.rb,
lib/corosync/cpg/member.rb,
lib/corosync/cpg/member_list.rb

Overview

CPG is used for sending messages between processes (usually on multiple servers). The benefits offered by CPG over normal IPC is that message order is guaranteed. If you have 3 nodes, and both node 1 and node 2 send a message at the exact same time, all 3 nodes will receive the messages in the same order. One of the key details in this is that a node will also receive it’s own message. You can also be notified whenever nodes join or leave the group. The order of these messages is preserved as well.

This is all done through callbacks. You define a block of code to execute, and whenever a message is received, it is passed to that block.

After registering the callbacks, you call #dispatch to check for any pending messages, upon which the appropriate callbacks will be executed.

The simplest usage of this library is to call ‘Corosync::CPG.new(’groupname’)‘. This will connect to CPG and join the specified group.

Threading notice

With MRI Ruby 1.9.3 and older, you cannot call #dispatch from within a thread. Attempting to do so will result in a segfault.

This is because the Corosync library allocates a very large buffer on the stack, and these versions of Ruby do not allocate enough memory to the thread stack.

With MRI Ruby 2.0.0 the behavior is a bit different. There is a workaround, but without it, calling #dispatch will result in the thread hanging. The workaround is that you you can pass the environment variable RUBY_THREAD_MACHINE_STACK_SIZE to increase the size of the thread stack. The recommended size is 1572864.


Examples:

require 'corosync/cpg'
cpg = Corosync::CPG.new('mygroup')
cpg.on_message do |sender, message|
  puts "Received #{message}"
end
puts "Member node IDs: #{cpg.members.map {|m| m.nodeid}.join(" ")}"
cpg.send "hello"
loop do
  cpg.dispatch
end

Defined Under Namespace

Classes: Member, MemberList

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group = nil) ⇒ void

Creates a new CPG connection to the CPG service.

You can spawn as many connections as you like in a single process, but each connection can only belong to a single group.

If you get an ERR_LIBRARY error, corosync is likely not running.

If you get an EACCESS error, you’re likely not running as root (or havent set a ‘uidgid` directive in the config).

Parameters:

  • group (String) (defaults to: nil)

    The name of the group to join. If not provided, you must call #join later.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/corosync/cpg.rb', line 54

def initialize(group = nil)
	# The model has to be preserved so it doesn't get garbage collected.
	# Apparently CPG needs to reference it long after initialization :-(
	#  (cpg.c:423)
	@model = Corosync::CpgModelV1DataT.new
	@model[:cpg_deliver_fn] = self.method(:callback_deliver)
	@model[:cpg_confchg_fn] = self.method(:callback_confchg)
	@model[:cpg_totem_confchg_fn] = self.method(:callback_totem_confchg)

	@group = nil
	@fd = nil
	@handle = nil

	join group if group
end

Instance Attribute Details

#fdIO (readonly)

The IO object containing the file descriptor events and messages come across. You can use this to check for activity, but do not read anything from it.

Returns:

  • (IO)


40
41
42
# File 'lib/corosync/cpg.rb', line 40

def fd
  @fd
end

#groupString (readonly)

Name of the currently joined group

Returns:

  • (String)


44
45
46
# File 'lib/corosync/cpg.rb', line 44

def group
  @group
end

#nodeidInteger (readonly)

The node ID of ourself.

Returns:

  • (Integer)


219
220
221
222
223
# File 'lib/corosync/cpg.rb', line 219

def nodeid
	nodeid_p = FFI::MemoryPointer.new(:uint)
	Corosync.cs_send(:cpg_local_get, @handle, nodeid_p)
	nodeid_p.read_uint
end

Instance Method Details

#connectvoid

This method returns an undefined value.

Connect to the CPG service.



72
73
74
75
76
77
78
79
80
81
# File 'lib/corosync/cpg.rb', line 72

def connect
	handle_ptr = FFI::MemoryPointer.new(Corosync.find_type(:cpg_handle_t))
	model_cast = Corosync::CpgModelDataT.new(@model.to_ptr)
	Corosync.cs_send(:cpg_model_initialize, handle_ptr, Corosync::CPG_MODEL_V1, model_cast, nil)
	@handle = handle_ptr.read_uint64

	fd_ptr = FFI::MemoryPointer.new(:int)
	Corosync.cs_send(:cpg_fd_get, @handle, fd_ptr)
	@fd = IO.new(fd_ptr.read_int)
end

#dispatch(timeout = -1)) ⇒ Boolean

Checks for a single pending events and triggers the appropriate callback if found.

Parameters:

  • timeout (Integer) (defaults to: -1))

    How long to wait for an event.

    • -1: Indefinite. Wait forever

    • 0: Non-blocking. If there isn’t a pending event, return immediately

    • >0: Wait the specified number of seconds.

Returns:

  • (Boolean)

    Returns True if an event was triggered. Otherwise False.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/corosync/cpg.rb', line 135

def dispatch(timeout = -1)
	if !timeout != 0 then
		timeout = nil if timeout == -1
		select([@fd], [], [], timeout)
	end

	begin
		Corosync.cs_send!(:cpg_dispatch, @handle, Corosync::CS_DISPATCH_ONE_NONBLOCKING)
	rescue Corosync::TryAgainError => e
		raise e if e.depth > 1 # this exception is from a nested corosync function, not our cpg_dispatch we just called
		return false
	end

	return true
end

#finalizevoid Also known as: close

This method returns an undefined value.

Shuts down the connection to the CPG service.



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/corosync/cpg.rb', line 85

def finalize
	return if @handle.nil?

	Corosync.cs_send(:cpg_finalize, @handle)

	@group = nil
	@fd = nil
	@model = nil
	@handle = nil

	true
end

#join(name) ⇒ void

This method returns an undefined value.

Join the specified closed process group.

Parameters:

  • name (String)

    Name of the group. Maximum length of 128 characters.



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/corosync/cpg.rb', line 102

def join(name)
	connect if @handle.nil?

	cpg_name = Corosync::CpgName.new
	cpg_name[:value] = name
	cpg_name[:length] = name.size
	Corosync.cs_send(:cpg_join, @handle, cpg_name)

	@group = name

	self
end

#leavevoid

This method returns an undefined value.

Leave the current closed process group.



117
118
119
120
121
122
123
124
125
126
127
# File 'lib/corosync/cpg.rb', line 117

def leave()
	return if !@group
	cpg_name = Corosync::CpgName.new
	cpg_name[:value] = @group
	cpg_name[:length] = @group.size

	# we can't join multiple groups, so I dont know why corosync requires you to specify the group name
	Corosync.cs_send(:cpg_leave, @handle, cpg_name)

	@group = nil
end

#memberCorosync::CPG::Member

Returns the member object describing ourself.



257
258
259
# File 'lib/corosync/cpg.rb', line 257

def member
	Corosync::CPG::Member.new(self.nodeid, $$)
end

#membersCorosync::CPG::MemberList

Gets a list of members currently in the group



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/corosync/cpg.rb', line 227

def members
	members = Corosync::CPG::MemberList.new

	cpg_name = Corosync::CpgName.new
	cpg_name[:value] = @group
	cpg_name[:length] = @group.size

	iteration_handle_ptr = FFI::MemoryPointer.new(Corosync.find_type(:cpg_iteration_handle_t))
	Corosync.cs_send(:cpg_iteration_initialize, @handle, Corosync::CPG_ITERATION_ONE_GROUP, cpg_name, iteration_handle_ptr)
	iteration_handle = iteration_handle_ptr.read_uint64

	begin
		iteration_description = Corosync::CpgIterationDescriptionT.new
		begin
			loop do
				Corosync.cs_send(:cpg_iteration_next, iteration_handle, iteration_description)
				members << Corosync::CPG::Member.new(iteration_description)
			end
		rescue Corosync::NoSectionsError
			# signals end of iteration
		end
	ensure
		Corosync.cs_send(:cpg_iteration_finalize, iteration_handle)
	end

	members
end

#on_confchg(&block) {|member_list, left_list, joined_list| ... } ⇒ void

This method returns an undefined value.

Proc to call when a node joins/leaves the group. If this is set before calling #join, it will be called when joining the group.

Parameters:

  • block (Proc)

    Proc to call when a node joins/leaves the group. Pass Nil to disable the callback.

Yield Parameters:



173
174
175
# File 'lib/corosync/cpg.rb', line 173

def on_confchg(&block)
	@callback_confchg = block
end

#on_message(&block) {|member, message| ... } ⇒ void

This method returns an undefined value.

Proc to call when a message is received.

Parameters:

  • block (Proc)

    Proc to call when a message is received. Pass Nil to disable the callback.

Yield Parameters:

  • member (Corosync::CPG::Member)

    Sender from which the message came

  • message (String)

    Message content.



156
157
158
# File 'lib/corosync/cpg.rb', line 156

def on_message(&block)
	@callback_deliver = block
end

#on_totem_confchg(&block) {|ring_id, member_list| ... } ⇒ void

This method returns an undefined value.

Proc to call when a node joins/leaves the cluster. If this is set before calling #connect or #join, it will be called when connecting to the cluster.

Parameters:

  • block (Proc)

    Proc to call when a node joins/leaves the cluster.

Yield Parameters:

  • ring_id (Integer)

    Ring ID change occurred on.

  • member_list (Array<Integer>)

    Node ID of members in the cluster after the change completed.



204
205
206
# File 'lib/corosync/cpg.rb', line 204

def on_totem_confchg(&block)
	@callback_totem_confchg = block
end

#send(messages) ⇒ void

This method returns an undefined value.

Send one or more messages to the group. Sending multiple messages through a single call to #send ensures that the messages will be delivered consecutively without another message in the middle.

Parameters:

  • messages (Array<String>, String)

    The message(s) to send.



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/corosync/cpg.rb', line 265

def send(messages)
	messages = [messages] if !messages.is_a?(Array)
	
	iovec_list_p = FFI::MemoryPointer.new(Corosync::Iovec, messages.size)
	iovec_list = messages.size.times.collect do |i|
		iovec = Corosync::Iovec.new(iovec_list_p + i * Corosync::Iovec.size)
		message = messages[i].to_s
		iovec[:iov_base] = FFI::MemoryPointer.from_string(message)
		iovec[:iov_len] = message.size
		iovec
	end
	iovec_len = messages.size

	Corosync.cs_send(:cpg_mcast_joined, @handle, Corosync::CPG_TYPE_AGREED, iovec_list_p, iovec_len)

	true
end