Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/0mq/socket.rb,
lib/0mq/socket/options.rb

Overview

See api.zeromq.org/4-0:zmq-socket Not thread safe.

Constant Summary collapse

@@get_options =
{
  :RCVMORE             => :bool,
  :RCVHWM              => :int,
  :AFFINITY            => :uint64,
  :IDENTITY            => :string,
  :RATE                => :int,
  :RECOVERY_IVL        => :int,
  :SNDBUF              => :int,
  :RCVBUF              => :int,
  :LINGER              => :int,
  :RECONNECT_IVL       => :int,
  :RECONNECT_IVL_MAX   => :int,
  :BACKLOG             => :int,
  :MAXMSGSIZE          => :int64,
  :MULTICAST_HOPS      => :int,
  :RCVTIMEO            => :int,
  :SNDTIMEO            => :int,
  :IPV6                => :bool,
  :IPV4ONLY            => :bool,
  :IMMEDIATE           => :bool,
  :FD                  => :int,
  :EVENTS              => :int,
  :LAST_ENDPOINT       => :string,
  :TCP_KEEPALIVE       => :int,
  :TCP_KEEPALIVE_IDLE  => :int,
  :TCP_KEEPALIVE_CNT   => :int,
  :TCP_KEEPALIVE_INTVL => :int,
  :MECHANISM           => :int,
  :PLAIN_SERVER        => :int,
  :PLAIN_USERNAME      => :string,
  :PLAIN_PASSWORD      => :string,
  :CURVE_PUBLICKEY     => :string,
  :CURVE_SECRETKEY     => :string,
  :CURVE_SERVERKEY     => :string,
  :ZAP_DOMAIN          => :string,
}
@@set_options =
{
  :SNDHWM              => :int,
  :RCVHWM              => :int,
  :AFFINITY            => :uint64,
  :SUBSCRIBE           => :string,
  :UNSUBSCRIBE         => :string,
  :IDENTITY            => :string,
  :RATE                => :int,
  :RECOVERY_IVL        => :int,
  :SNDBUF              => :int,
  :RCVBUF              => :int,
  :LINGER              => :int,
  :RECONNECT_IVL       => :int,
  :RECONNECT_IVL_MAX   => :int,
  :RECONNECT_IVL       => :int,
  :BACKLOG             => :int,
  :MAXMSGSIZE          => :int64,
  :MULTICAST_HOPS      => :int,
  :RCVTIMEO            => :int,
  :SNDTIMEO            => :int,
  :IPV6                => :bool,
  :IPV4ONLY            => :bool,
  :IMMEDIATE           => :bool,
  :ROUTER_HANDOVER     => :int,
  :ROUTER_MANDATORY    => :int,
  :ROUTER_RAW          => :int,
  :PROBE_ROUTER        => :int,
  :XPUB_VERBOSE        => :int,
  :REQ_CORRELATE       => :int,
  :REQ_RELAXED         => :int,
  :TCP_KEEPALIVE       => :int,
  :TCP_KEEPALIVE_IDLE  => :int,
  :TCP_KEEPALIVE_CNT   => :int,
  :TCP_KEEPALIVE_INTVL => :int,
  :TCP_ACCEPT_FILTER   => :string,
  :PLAIN_SERVER        => :int,
  :PLAIN_USERNAME      => :string,
  :PLAIN_PASSWORD      => :string,
  :CURVE_SERVER        => :int,
  :CURVE_PUBLICKEY     => :string,
  :CURVE_SECRETKEY     => :string,
  :CURVE_SERVERKEY     => :string,
  :ZAP_DOMAIN          => :string,
  :CONFLATE            => :bool,
}
@@option_types =

Set up map of option codes to option types

{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type, opts = {}) ⇒ Socket

Returns a new instance of Socket.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/0mq/socket.rb', line 17

def initialize(type, opts={})
  @closed = false
  @context = opts.fetch :context, ZMQ::DefaultContext
  @type = type
  @pointer = LibZMQ.zmq_socket @context.pointer, @type
  ZMQ.error_check true if @pointer.null?
  
  @msgptr = FFI::MemoryPointer.new LibZMQ::Message.size, 1, false
  
  @context.send :register_socket_pointer, @pointer
  
  ObjectSpace.define_finalizer self,
    self.class.finalizer(@pointer, @context, Process.pid)
end

Instance Attribute Details

#contextObject (readonly)

The socket’s ZMQ::Context.



13
14
15
# File 'lib/0mq/socket.rb', line 13

def context
  @context
end

#pointerObject (readonly)

The FFI pointer to the socket.



11
12
13
# File 'lib/0mq/socket.rb', line 11

def pointer
  @pointer
end

#typeObject (readonly)

The socket’s ZeroMQ socket type (e.g. ZMQ::ROUTER).



15
16
17
# File 'lib/0mq/socket.rb', line 15

def type
  @type
end

Class Method Details

.finalizer(pointer, context, pid) ⇒ Object

Create a safe finalizer for the socket pointer to close on GC



61
62
63
64
65
66
67
68
# File 'lib/0mq/socket.rb', line 61

def self.finalizer(pointer, context, pid)
  Proc.new do
    if Process.pid == pid
      context.send :unregister_socket_pointer, pointer
      LibZMQ.zmq_close pointer
    end
  end
end

Instance Method Details

#bind(endpoint) ⇒ Object

Bind to an endpoint



76
77
78
79
# File 'lib/0mq/socket.rb', line 76

def bind(endpoint)
  rc = LibZMQ.zmq_bind @pointer, endpoint
  ZMQ.error_check true if rc==-1
end

#closeObject

Close the socket



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/0mq/socket.rb', line 38

def close
  @closed = true
  
  if @pointer
    @temp_buffers.clear if @temp_buffers
    
    ObjectSpace.undefine_finalizer self
    @context.send :unregister_socket_pointer, @pointer
    
    rc = LibZMQ.zmq_close @pointer
    ZMQ.error_check true if rc==-1
    
    @pointer = nil
    @context = nil
  end
end

#closed?Boolean

Returns true if the socket is closed.

Returns:

  • (Boolean)


56
57
58
# File 'lib/0mq/socket.rb', line 56

def closed?
  @closed
end

#connect(endpoint) ⇒ Object

Connect to an endpoint



82
83
84
85
# File 'lib/0mq/socket.rb', line 82

def connect(endpoint)
  rc = LibZMQ.zmq_connect @pointer, endpoint
  ZMQ.error_check true if rc==-1
end

#disconnect(endpoint) ⇒ Object

Disconnect from an endpoint



94
95
96
97
# File 'lib/0mq/socket.rb', line 94

def disconnect(endpoint)
  rc = LibZMQ.zmq_disconnect @pointer, endpoint
  ZMQ.error_check true if rc==-1
end

#get_opt(option) ⇒ Object

Get a socket option



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/0mq/socket.rb', line 202

def get_opt(option)
  type = @@option_types.fetch(option) \
    { raise ArgumentError, "Unknown option: #{option}" }
  
  value, size = get_opt_pointers type
  
  rc = LibZMQ.zmq_getsockopt @pointer, option, value, size
  ZMQ.error_check true if rc==-1
  
  if type == :string
    value.read_string(size.read_int-1)
  elsif type == :bool
    value.read_int == 1
  else
    value.send :"read_#{type}"
  end
end

#inspectObject

Show a useful inspect output



33
34
35
# File 'lib/0mq/socket.rb', line 33

def inspect
  "#<#{self.class}:#{type_sym}:#{object_id.to_s(16)}>"
end

#recv_array(flags = 0) ⇒ Object

Receive a multipart message as an array of strings



141
142
143
144
145
146
147
148
# File 'lib/0mq/socket.rb', line 141

def recv_array(flags = 0)
  [].tap do |ary|
    loop do
      ary << recv_string(flags)
      break unless get_opt(ZMQ::RCVMORE)
    end
  end
end

#recv_string(flags = 0) ⇒ Object

Receive a string from the socket



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/0mq/socket.rb', line 117

def recv_string(flags = 0)
  rc = LibZMQ.zmq_msg_init @msgptr
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_recvmsg @pointer, @msgptr, flags
  ZMQ.error_check true if rc==-1
  
  str = LibZMQ.zmq_msg_data(@msgptr)
              .read_string(LibZMQ.zmq_msg_size(@msgptr))
  
  rc = LibZMQ.zmq_msg_close @msgptr
  ZMQ.error_check true if rc==-1
  
  str
end

#recv_with_routingObject

Receive a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part is not included in the resulting arrays.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/0mq/socket.rb', line 162

def recv_with_routing
  [[],[]].tap do |routing, body|
    loop do
      nxt = recv_string
      break if nxt.empty?
      routing << nxt
      raise ArgumentError, "Expected empty routing delimiter in "\
                           "multipart message: #{routing}" \
                            unless get_opt ZMQ::RCVMORE
    end
    loop do
      body << recv_string
      break unless get_opt(ZMQ::RCVMORE)
    end
  end
end

#send_array(array, flags = 0) ⇒ Object

Send a multipart message as an array of strings



134
135
136
137
138
# File 'lib/0mq/socket.rb', line 134

def send_array(array, flags = 0)
  array = array.to_a
  array[0...-1].each { |str| send_string str, ZMQ::SNDMORE|flags }
  send_string array.last, flags
end

#send_string(string, flags = 0) ⇒ Object

Send a string to the socket



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/0mq/socket.rb', line 100

def send_string(string, flags = 0)
  string = string.to_s
  size = string.respond_to?(:bytesize) ? string.bytesize : string.size
  @msgbuf = LibC.malloc size
  @msgbuf.write_string string, size
  
  rc = LibZMQ.zmq_msg_init_data @msgptr, @msgbuf, size, LibC::Free, nil
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_sendmsg @pointer, @msgptr, flags
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_msg_close @msgptr
  ZMQ.error_check true if rc==-1
end

#send_with_routing(routing, body) ⇒ Object

Send a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part should not be included in the input arrays.



154
155
156
# File 'lib/0mq/socket.rb', line 154

def send_with_routing(routing, body)
  send_array [*routing, '', *body]
end

#set_opt(option, value) ⇒ Object

Set a socket option



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/0mq/socket.rb', line 180

def set_opt(option, value)
  type = @@option_types.fetch(option) \
    { raise ArgumentError, "Unknown option: #{option}" }
  
  unless type == :string
    if type == :bool
      valptr = FFI::MemoryPointer.new(:int)
      valptr.write_int(value ? 1 : 0)
    else
      valptr = FFI::MemoryPointer.new(type)
      valptr.send :"write_#{type}", value
    end
    value = valptr
  end
  
  rc = LibZMQ.zmq_setsockopt @pointer, option, value, value.size
  ZMQ.error_check true if rc==-1
  
  value
end

#to_ptrObject

Returns the socket’s FFI pointer.



221
222
223
# File 'lib/0mq/socket.rb', line 221

def to_ptr
  @pointer
end

#type_symObject

Get the socket type name as a symbol



71
72
73
# File 'lib/0mq/socket.rb', line 71

def type_sym
  ZMQ::SocketTypeNameMap[type].to_sym
end

#unbind(endpoint) ⇒ Object

Unbind from an endpoint



88
89
90
91
# File 'lib/0mq/socket.rb', line 88

def unbind(endpoint)
  rc = LibZMQ.zmq_unbind @pointer, endpoint
  ZMQ.error_check true if rc==-1
end