Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/0mq/socket.rb,
lib/0mq/socket/options.rb
Constant Summary collapse
- @@get_options =
{ :RCVMORE => :bool, :RCVHWM => :int, :AFFINITY => :uint64, :IDENTITY => :string, :RATE => :int, :RECOVERY_IVL => :int, :SNDBUF => :int, :RCVBUF => :int, :LINGER => :int, :RECONNECT_IVL => :int, :RECONNECT_IVL_MAX => :int, :BACKLOG => :int, :MAXMSGSIZE => :int64, :MULTICAST_HOPS => :int, :RCVTIMEO => :int, :SNDTIMEO => :int, :IPV6 => :bool, :IPV4ONLY => :bool, :IMMEDIATE => :bool, :FD => :int, :EVENTS => :int, :LAST_ENDPOINT => :string, :TCP_KEEPALIVE => :int, :TCP_KEEPALIVE_IDLE => :int, :TCP_KEEPALIVE_CNT => :int, :TCP_KEEPALIVE_INTVL => :int, :MECHANISM => :int, :PLAIN_SERVER => :int, :PLAIN_USERNAME => :string, :PLAIN_PASSWORD => :string, :CURVE_PUBLICKEY => :string, :CURVE_SECRETKEY => :string, :CURVE_SERVERKEY => :string, :ZAP_DOMAIN => :string, }
- @@set_options =
{ :SNDHWM => :int, :RCVHWM => :int, :AFFINITY => :uint64, :SUBSCRIBE => :string, :UNSUBSCRIBE => :string, :IDENTITY => :string, :RATE => :int, :RECOVERY_IVL => :int, :SNDBUF => :int, :RCVBUF => :int, :LINGER => :int, :RECONNECT_IVL => :int, :RECONNECT_IVL_MAX => :int, :RECONNECT_IVL => :int, :BACKLOG => :int, :MAXMSGSIZE => :int64, :MULTICAST_HOPS => :int, :RCVTIMEO => :int, :SNDTIMEO => :int, :IPV6 => :bool, :IPV4ONLY => :bool, :IMMEDIATE => :bool, :ROUTER_HANDOVER => :int, :ROUTER_MANDATORY => :int, :ROUTER_RAW => :int, :PROBE_ROUTER => :int, :XPUB_VERBOSE => :int, :REQ_CORRELATE => :int, :REQ_RELAXED => :int, :TCP_KEEPALIVE => :int, :TCP_KEEPALIVE_IDLE => :int, :TCP_KEEPALIVE_CNT => :int, :TCP_KEEPALIVE_INTVL => :int, :TCP_ACCEPT_FILTER => :string, :PLAIN_SERVER => :int, :PLAIN_USERNAME => :string, :PLAIN_PASSWORD => :string, :CURVE_SERVER => :int, :CURVE_PUBLICKEY => :string, :CURVE_SECRETKEY => :string, :CURVE_SERVERKEY => :string, :ZAP_DOMAIN => :string, :CONFLATE => :bool, }
- @@option_types =
Set up map of option codes to option types
{}
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#ptr ⇒ Object
readonly
Returns the value of attribute ptr.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Class Method Summary collapse
-
.finalizer(ptr, pid) ⇒ Object
Create a safe finalizer for the socket ptr to close on GC of the object.
Instance Method Summary collapse
-
#bind(endpoint) ⇒ Object
Bind to an endpoint.
-
#close ⇒ Object
Close the socket.
-
#connect(endpoint) ⇒ Object
Connect to an endpoint.
-
#disconnect(endpoint) ⇒ Object
Disconnect to an endpoint.
-
#get_opt(option) ⇒ Object
Get a socket option.
-
#initialize(type, opts = {}) ⇒ Socket
constructor
A new instance of Socket.
-
#recv_array ⇒ Object
Receive a multipart message as an array of strings.
-
#recv_string(flags = 0) ⇒ Object
Receive a string from the socket.
-
#recv_with_routing ⇒ Object
Receive a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts.
-
#send_array(ary) ⇒ Object
Send a multipart message as an array of strings.
-
#send_string(string, flags = 0) ⇒ Object
Send a string to the socket.
-
#send_with_routing(routing, body) ⇒ Object
Send a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts.
-
#set_opt(option, value) ⇒ Object
Set a socket option.
-
#type_sym ⇒ Object
Get the socket type name as a symbol.
-
#unbind(endpoint) ⇒ Object
Unbind from an endpoint.
Constructor Details
#initialize(type, opts = {}) ⇒ Socket
Returns a new instance of Socket.
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/0mq/socket.rb', line 11 def initialize(type, opts={}) @context = opts.fetch :context, ZMQ::DefaultContext @type = type @ptr = LibZMQ.zmq_socket @context.ptr, @type ZMQ.error_check true if @ptr.null? @msgptr = FFI::MemoryPointer.new LibZMQ::Message.size, 1, false ObjectSpace.define_finalizer self, self.class.finalizer(@socket, Process.pid) end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
8 9 10 |
# File 'lib/0mq/socket.rb', line 8 def context @context end |
#ptr ⇒ Object (readonly)
Returns the value of attribute ptr.
7 8 9 |
# File 'lib/0mq/socket.rb', line 7 def ptr @ptr end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
9 10 11 |
# File 'lib/0mq/socket.rb', line 9 def type @type end |
Class Method Details
.finalizer(ptr, pid) ⇒ Object
Create a safe finalizer for the socket ptr to close on GC of the object
37 38 39 |
# File 'lib/0mq/socket.rb', line 37 def self.finalizer(ptr, pid) Proc.new { LibZMQ.zmq_close ptr if Process.pid == pid } end |
Instance Method Details
#bind(endpoint) ⇒ Object
Bind to an endpoint
47 48 49 50 |
# File 'lib/0mq/socket.rb', line 47 def bind(endpoint) rc = LibZMQ.zmq_bind @ptr, endpoint ZMQ.error_check true if rc==-1 end |
#close ⇒ Object
Close the socket
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/0mq/socket.rb', line 24 def close if @ptr ObjectSpace.undefine_finalizer self @temp_buffers.clear if @temp_buffers rc = LibZMQ.zmq_close @ptr ZMQ.error_check true if rc==-1 @ptr = nil end end |
#connect(endpoint) ⇒ Object
Connect to an endpoint
53 54 55 56 |
# File 'lib/0mq/socket.rb', line 53 def connect(endpoint) rc = LibZMQ.zmq_connect @ptr, endpoint ZMQ.error_check true if rc==-1 end |
#disconnect(endpoint) ⇒ Object
Disconnect to an endpoint
65 66 67 68 |
# File 'lib/0mq/socket.rb', line 65 def disconnect(endpoint) rc = LibZMQ.zmq_disconnect @ptr, endpoint ZMQ.error_check true if rc==-1 end |
#get_opt(option) ⇒ Object
Get a socket option
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/0mq/socket.rb', line 173 def get_opt(option) type = @@option_types.fetch(option) \ { raise ArgumentError, "Unknown option: #{option}" } value, size = get_opt_pointers type rc = LibZMQ.zmq_getsockopt @ptr, option, value, size ZMQ.error_check true if rc==-1 if type == :string value.read_string(size.read_int-1) elsif type == :bool value.read_int == 1 else value.send :"read_#{type}" end end |
#recv_array ⇒ Object
Receive a multipart message as an array of strings
112 113 114 115 116 117 118 119 |
# File 'lib/0mq/socket.rb', line 112 def recv_array [].tap do |ary| loop do ary << recv_string break unless get_opt(ZMQ::RCVMORE) end end end |
#recv_string(flags = 0) ⇒ Object
Receive a string from the socket
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/0mq/socket.rb', line 87 def recv_string(flags = 0) rc = LibZMQ.zmq_msg_init @msgptr ZMQ.error_check true if rc==-1 rc = LibZMQ.zmq_recvmsg @ptr, @msgptr, flags ZMQ.error_check true if rc==-1 str = LibZMQ.zmq_msg_data(@msgptr) .read_string(LibZMQ.zmq_msg_size(@msgptr)) rc = LibZMQ.zmq_msg_close @msgptr ZMQ.error_check true if rc==-1 str end |
#recv_with_routing ⇒ Object
Receive a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part is not included in the resulting arrays.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/0mq/socket.rb', line 133 def recv_with_routing [[],[]].tap do |routing, body| loop do nxt = recv_string break if nxt.empty? routing << nxt raise ArgumentError, "Expected empty routing delimiter in "\ "multipart message: #{routing}" \ unless get_opt ZMQ::RCVMORE end loop do body << recv_string break unless get_opt(ZMQ::RCVMORE) end end end |
#send_array(ary) ⇒ Object
Send a multipart message as an array of strings
104 105 106 107 108 109 |
# File 'lib/0mq/socket.rb', line 104 def send_array(ary) last = ary.last ary[0...-1].each { |str| send_string str, ZMQ::SNDMORE } send_string last end |
#send_string(string, flags = 0) ⇒ Object
Send a string to the socket
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/0mq/socket.rb', line 71 def send_string(string, flags = 0) size = string.respond_to?(:bytesize) ? string.bytesize : string.size @msgbuf = LibC.malloc size @msgbuf.write_string string, size rc = LibZMQ.zmq_msg_init_data @msgptr, @msgbuf, size, LibC::Free, nil ZMQ.error_check true if rc==-1 rc = LibZMQ.zmq_sendmsg @ptr, @msgptr, flags ZMQ.error_check true if rc==-1 rc = LibZMQ.zmq_msg_close @msgptr ZMQ.error_check true if rc==-1 end |
#send_with_routing(routing, body) ⇒ Object
Send a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part should not be included in the input arrays.
125 126 127 |
# File 'lib/0mq/socket.rb', line 125 def send_with_routing(routing, body) send_array [*routing, '', *body] end |
#set_opt(option, value) ⇒ Object
Set a socket option
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/0mq/socket.rb', line 151 def set_opt(option, value) type = @@option_types.fetch(option) \ { raise ArgumentError, "Unknown option: #{option}" } unless type == :string if type == :bool valptr = FFI::MemoryPointer.new(:int) valptr.write_int(value ? 1 : 0) else valptr = FFI::MemoryPointer.new(type) valptr.send :"write_#{type}", value end value = valptr end rc = LibZMQ.zmq_setsockopt @ptr, option, value, value.size ZMQ.error_check true if rc==-1 value end |
#type_sym ⇒ Object
Get the socket type name as a symbol
42 43 44 |
# File 'lib/0mq/socket.rb', line 42 def type_sym ZMQ::SocketTypeNameMap[type].to_sym end |
#unbind(endpoint) ⇒ Object
Unbind from an endpoint
59 60 61 62 |
# File 'lib/0mq/socket.rb', line 59 def unbind(endpoint) rc = LibZMQ.zmq_unbind @ptr, endpoint ZMQ.error_check true if rc==-1 end |