Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/0mq/socket.rb,
lib/0mq/socket/options.rb

Overview

See api.zeromq.org/4-0:zmq-socket Not thread safe.

Constant Summary collapse

@@get_options =
{
  :RCVMORE             => :bool,
  :RCVHWM              => :int,
  :AFFINITY            => :uint64,
  :IDENTITY            => :string,
  :RATE                => :int,
  :RECOVERY_IVL        => :int,
  :SNDBUF              => :int,
  :RCVBUF              => :int,
  :LINGER              => :int,
  :RECONNECT_IVL       => :int,
  :RECONNECT_IVL_MAX   => :int,
  :BACKLOG             => :int,
  :MAXMSGSIZE          => :int64,
  :MULTICAST_HOPS      => :int,
  :RCVTIMEO            => :int,
  :SNDTIMEO            => :int,
  :IPV6                => :bool,
  :IPV4ONLY            => :bool,
  :IMMEDIATE           => :bool,
  :FD                  => :int,
  :EVENTS              => :int,
  :LAST_ENDPOINT       => :string,
  :TCP_KEEPALIVE       => :int,
  :TCP_KEEPALIVE_IDLE  => :int,
  :TCP_KEEPALIVE_CNT   => :int,
  :TCP_KEEPALIVE_INTVL => :int,
  :MECHANISM           => :int,
  :PLAIN_SERVER        => :int,
  :PLAIN_USERNAME      => :string,
  :PLAIN_PASSWORD      => :string,
  :CURVE_PUBLICKEY     => :string,
  :CURVE_SECRETKEY     => :string,
  :CURVE_SERVERKEY     => :string,
  :ZAP_DOMAIN          => :string,
}
@@set_options =
{
  :SNDHWM              => :int,
  :RCVHWM              => :int,
  :AFFINITY            => :uint64,
  :SUBSCRIBE           => :string,
  :UNSUBSCRIBE         => :string,
  :IDENTITY            => :string,
  :RATE                => :int,
  :RECOVERY_IVL        => :int,
  :SNDBUF              => :int,
  :RCVBUF              => :int,
  :LINGER              => :int,
  :RECONNECT_IVL       => :int,
  :RECONNECT_IVL_MAX   => :int,
  :RECONNECT_IVL       => :int,
  :BACKLOG             => :int,
  :MAXMSGSIZE          => :int64,
  :MULTICAST_HOPS      => :int,
  :RCVTIMEO            => :int,
  :SNDTIMEO            => :int,
  :IPV6                => :bool,
  :IPV4ONLY            => :bool,
  :IMMEDIATE           => :bool,
  :ROUTER_HANDOVER     => :int,
  :ROUTER_MANDATORY    => :int,
  :ROUTER_RAW          => :int,
  :PROBE_ROUTER        => :int,
  :XPUB_VERBOSE        => :int,
  :REQ_CORRELATE       => :int,
  :REQ_RELAXED         => :int,
  :TCP_KEEPALIVE       => :int,
  :TCP_KEEPALIVE_IDLE  => :int,
  :TCP_KEEPALIVE_CNT   => :int,
  :TCP_KEEPALIVE_INTVL => :int,
  :TCP_ACCEPT_FILTER   => :string,
  :PLAIN_SERVER        => :int,
  :PLAIN_USERNAME      => :string,
  :PLAIN_PASSWORD      => :string,
  :CURVE_SERVER        => :int,
  :CURVE_PUBLICKEY     => :string,
  :CURVE_SECRETKEY     => :string,
  :CURVE_SERVERKEY     => :string,
  :ZAP_DOMAIN          => :string,
  :CONFLATE            => :bool,
}
@@option_types =

Set up map of option codes to option types

{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type, opts = {}) ⇒ Socket



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/0mq/socket.rb', line 17

def initialize(type, opts={})
  @closed = false
  @context = opts.fetch :context, ZMQ::DefaultContext
  @type = type
  @pointer = LibZMQ.zmq_socket @context.pointer, @type
  ZMQ.error_check true if @pointer.null?
  
  @msgptr = FFI::MemoryPointer.new LibZMQ::Message.size, 1, false
  
  ObjectSpace.define_finalizer self,
                               self.class.finalizer(@socket, Process.pid)
end

Instance Attribute Details

#contextObject (readonly)

The socket’s ZMQ::Context.



13
14
15
# File 'lib/0mq/socket.rb', line 13

def context
  @context
end

#pointerObject (readonly)

The FFI pointer to the socket.



11
12
13
# File 'lib/0mq/socket.rb', line 11

def pointer
  @pointer
end

#typeObject (readonly)

The socket’s ZeroMQ socket type (e.g. ZMQ::ROUTER).



15
16
17
# File 'lib/0mq/socket.rb', line 15

def type
  @type
end

Class Method Details

.finalizer(pointer, pid) ⇒ Object

Create a safe finalizer for the socket pointer to close on GC of the object



56
57
58
# File 'lib/0mq/socket.rb', line 56

def self.finalizer(pointer, pid)
  Proc.new { LibZMQ.zmq_close pointer if Process.pid == pid }
end

Instance Method Details

#bind(endpoint) ⇒ Object

Bind to an endpoint



66
67
68
69
# File 'lib/0mq/socket.rb', line 66

def bind(endpoint)
  rc = LibZMQ.zmq_bind @pointer, endpoint
  ZMQ.error_check true if rc==-1
end

#closeObject

Close the socket



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/0mq/socket.rb', line 36

def close
  @closed = true
  
  if @pointer
    ObjectSpace.undefine_finalizer self
    @temp_buffers.clear if @temp_buffers
    
    rc = LibZMQ.zmq_close @pointer
    ZMQ.error_check true if rc==-1
    
    @pointer = nil
  end
end

#closed?Boolean

Returns true if the socket is closed.



51
52
53
# File 'lib/0mq/socket.rb', line 51

def closed?
  @closed
end

#connect(endpoint) ⇒ Object

Connect to an endpoint



72
73
74
75
# File 'lib/0mq/socket.rb', line 72

def connect(endpoint)
  rc = LibZMQ.zmq_connect @pointer, endpoint
  ZMQ.error_check true if rc==-1
end

#disconnect(endpoint) ⇒ Object

Disconnect from an endpoint



84
85
86
87
# File 'lib/0mq/socket.rb', line 84

def disconnect(endpoint)
  rc = LibZMQ.zmq_disconnect @pointer, endpoint
  ZMQ.error_check true if rc==-1
end

#get_opt(option) ⇒ Object

Get a socket option



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/0mq/socket.rb', line 192

def get_opt(option)
  type = @@option_types.fetch(option) \
    { raise ArgumentError, "Unknown option: #{option}" }
  
  value, size = get_opt_pointers type
  
  rc = LibZMQ.zmq_getsockopt @pointer, option, value, size
  ZMQ.error_check true if rc==-1
  
  if type == :string
    value.read_string(size.read_int-1)
  elsif type == :bool
    value.read_int == 1
  else
    value.send :"read_#{type}"
  end
end

#inspectObject

Show a useful inspect output



31
32
33
# File 'lib/0mq/socket.rb', line 31

def inspect
  "#<#{self.class}:#{type_sym}:#{object_id.to_s(16)}>"
end

#recv_array(flags = 0) ⇒ Object

Receive a multipart message as an array of strings



131
132
133
134
135
136
137
138
# File 'lib/0mq/socket.rb', line 131

def recv_array(flags = 0)
  [].tap do |ary|
    loop do
      ary << recv_string(flags)
      break unless get_opt(ZMQ::RCVMORE)
    end
  end
end

#recv_string(flags = 0) ⇒ Object

Receive a string from the socket



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/0mq/socket.rb', line 107

def recv_string(flags = 0)
  rc = LibZMQ.zmq_msg_init @msgptr
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_recvmsg @pointer, @msgptr, flags
  ZMQ.error_check true if rc==-1
  
  str = LibZMQ.zmq_msg_data(@msgptr)
              .read_string(LibZMQ.zmq_msg_size(@msgptr))
  
  rc = LibZMQ.zmq_msg_close @msgptr
  ZMQ.error_check true if rc==-1
  
  str
end

#recv_with_routingObject

Receive a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part is not included in the resulting arrays.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/0mq/socket.rb', line 152

def recv_with_routing
  [[],[]].tap do |routing, body|
    loop do
      nxt = recv_string
      break if nxt.empty?
      routing << nxt
      raise ArgumentError, "Expected empty routing delimiter in "\
                           "multipart message: #{routing}" \
                            unless get_opt ZMQ::RCVMORE
    end
    loop do
      body << recv_string
      break unless get_opt(ZMQ::RCVMORE)
    end
  end
end

#send_array(array, flags = 0) ⇒ Object

Send a multipart message as an array of strings



124
125
126
127
128
# File 'lib/0mq/socket.rb', line 124

def send_array(array, flags = 0)
  array = array.to_a
  array[0...-1].each { |str| send_string str, ZMQ::SNDMORE|flags }
  send_string array.last, flags
end

#send_string(string, flags = 0) ⇒ Object

Send a string to the socket



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/0mq/socket.rb', line 90

def send_string(string, flags = 0)
  string = string.to_s
  size = string.respond_to?(:bytesize) ? string.bytesize : string.size
  @msgbuf = LibC.malloc size
  @msgbuf.write_string string, size
  
  rc = LibZMQ.zmq_msg_init_data @msgptr, @msgbuf, size, LibC::Free, nil
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_sendmsg @pointer, @msgptr, flags
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_msg_close @msgptr
  ZMQ.error_check true if rc==-1
end

#send_with_routing(routing, body) ⇒ Object

Send a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part should not be included in the input arrays.



144
145
146
# File 'lib/0mq/socket.rb', line 144

def send_with_routing(routing, body)
  send_array [*routing, '', *body]
end

#set_opt(option, value) ⇒ Object

Set a socket option



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/0mq/socket.rb', line 170

def set_opt(option, value)
  type = @@option_types.fetch(option) \
    { raise ArgumentError, "Unknown option: #{option}" }
  
  unless type == :string
    if type == :bool
      valptr = FFI::MemoryPointer.new(:int)
      valptr.write_int(value ? 1 : 0)
    else
      valptr = FFI::MemoryPointer.new(type)
      valptr.send :"write_#{type}", value
    end
    value = valptr
  end
  
  rc = LibZMQ.zmq_setsockopt @pointer, option, value, value.size
  ZMQ.error_check true if rc==-1
  
  value
end

#to_ptrObject

Returns the socket’s FFI pointer.



211
212
213
# File 'lib/0mq/socket.rb', line 211

def to_ptr
  @pointer
end

#type_symObject

Get the socket type name as a symbol



61
62
63
# File 'lib/0mq/socket.rb', line 61

def type_sym
  ZMQ::SocketTypeNameMap[type].to_sym
end

#unbind(endpoint) ⇒ Object

Unbind from an endpoint



78
79
80
81
# File 'lib/0mq/socket.rb', line 78

def unbind(endpoint)
  rc = LibZMQ.zmq_unbind @pointer, endpoint
  ZMQ.error_check true if rc==-1
end