Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/0mq/socket.rb,
lib/0mq/socket/options.rb

Constant Summary collapse

@@get_options =
{
  :RCVMORE             => :bool,
  :RCVHWM              => :int,
  :AFFINITY            => :uint64,
  :IDENTITY            => :string,
  :RATE                => :int,
  :RECOVERY_IVL        => :int,
  :SNDBUF              => :int,
  :RCVBUF              => :int,
  :LINGER              => :int,
  :RECONNECT_IVL       => :int,
  :RECONNECT_IVL_MAX   => :int,
  :BACKLOG             => :int,
  :MAXMSGSIZE          => :int64,
  :MULTICAST_HOPS      => :int,
  :RCVTIMEO            => :int,
  :SNDTIMEO            => :int,
  :IPV6                => :bool,
  :IPV4ONLY            => :bool,
  :IMMEDIATE           => :bool,
  :FD                  => :int,
  :EVENTS              => :int,
  :LAST_ENDPOINT       => :string,
  :TCP_KEEPALIVE       => :int,
  :TCP_KEEPALIVE_IDLE  => :int,
  :TCP_KEEPALIVE_CNT   => :int,
  :TCP_KEEPALIVE_INTVL => :int,
  :MECHANISM           => :int,
  :PLAIN_SERVER        => :int,
  :PLAIN_USERNAME      => :string,
  :PLAIN_PASSWORD      => :string,
  :CURVE_PUBLICKEY     => :string,
  :CURVE_SECRETKEY     => :string,
  :CURVE_SERVERKEY     => :string,
  :ZAP_DOMAIN          => :string,
}
@@set_options =
{
  :SNDHWM              => :int,
  :RCVHWM              => :int,
  :AFFINITY            => :uint64,
  :SUBSCRIBE           => :string,
  :UNSUBSCRIBE         => :string,
  :IDENTITY            => :string,
  :RATE                => :int,
  :RECOVERY_IVL        => :int,
  :SNDBUF              => :int,
  :RCVBUF              => :int,
  :LINGER              => :int,
  :RECONNECT_IVL       => :int,
  :RECONNECT_IVL_MAX   => :int,
  :RECONNECT_IVL       => :int,
  :BACKLOG             => :int,
  :MAXMSGSIZE          => :int64,
  :MULTICAST_HOPS      => :int,
  :RCVTIMEO            => :int,
  :SNDTIMEO            => :int,
  :IPV6                => :bool,
  :IPV4ONLY            => :bool,
  :IMMEDIATE           => :bool,
  :ROUTER_HANDOVER     => :int,
  :ROUTER_MANDATORY    => :int,
  :ROUTER_RAW          => :int,
  :PROBE_ROUTER        => :int,
  :XPUB_VERBOSE        => :int,
  :REQ_CORRELATE       => :int,
  :REQ_RELAXED         => :int,
  :TCP_KEEPALIVE       => :int,
  :TCP_KEEPALIVE_IDLE  => :int,
  :TCP_KEEPALIVE_CNT   => :int,
  :TCP_KEEPALIVE_INTVL => :int,
  :TCP_ACCEPT_FILTER   => :string,
  :PLAIN_SERVER        => :int,
  :PLAIN_USERNAME      => :string,
  :PLAIN_PASSWORD      => :string,
  :CURVE_SERVER        => :int,
  :CURVE_PUBLICKEY     => :string,
  :CURVE_SECRETKEY     => :string,
  :CURVE_SERVERKEY     => :string,
  :ZAP_DOMAIN          => :string,
  :CONFLATE            => :bool,
}
@@option_types =

Set up map of option codes to option types

{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type, opts = {}) ⇒ Socket

Returns a new instance of Socket.



11
12
13
14
15
16
17
18
19
20
21
# File 'lib/0mq/socket.rb', line 11

def initialize(type, opts={})
  @context = opts.fetch :context, ZMQ::DefaultContext
  @type = type
  @ptr = LibZMQ.zmq_socket @context.ptr, @type
  ZMQ.error_check true if @ptr.null?
  
  @msgptr = FFI::MemoryPointer.new LibZMQ::Message.size, 1, false
  
  ObjectSpace.define_finalizer self,
                               self.class.finalizer(@socket, Process.pid)
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



8
9
10
# File 'lib/0mq/socket.rb', line 8

def context
  @context
end

#ptrObject (readonly)

Returns the value of attribute ptr.



7
8
9
# File 'lib/0mq/socket.rb', line 7

def ptr
  @ptr
end

#typeObject (readonly)

Returns the value of attribute type.



9
10
11
# File 'lib/0mq/socket.rb', line 9

def type
  @type
end

Class Method Details

.finalizer(ptr, pid) ⇒ Object

Create a safe finalizer for the socket ptr to close on GC of the object



37
38
39
# File 'lib/0mq/socket.rb', line 37

def self.finalizer(ptr, pid)
  Proc.new { LibZMQ.zmq_close ptr if Process.pid == pid }
end

Instance Method Details

#bind(endpoint) ⇒ Object

Bind to an endpoint



47
48
49
50
# File 'lib/0mq/socket.rb', line 47

def bind(endpoint)
  rc = LibZMQ.zmq_bind @ptr, endpoint
  ZMQ.error_check true if rc==-1
end

#closeObject

Close the socket



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/0mq/socket.rb', line 24

def close
  if @ptr
    ObjectSpace.undefine_finalizer self
    @temp_buffers.clear if @temp_buffers
    
    rc = LibZMQ.zmq_close @ptr
    ZMQ.error_check true if rc==-1
    
    @ptr = nil
  end
end

#connect(endpoint) ⇒ Object

Connect to an endpoint



53
54
55
56
# File 'lib/0mq/socket.rb', line 53

def connect(endpoint)
  rc = LibZMQ.zmq_connect @ptr, endpoint
  ZMQ.error_check true if rc==-1
end

#disconnect(endpoint) ⇒ Object

Disconnect to an endpoint



65
66
67
68
# File 'lib/0mq/socket.rb', line 65

def disconnect(endpoint)
  rc = LibZMQ.zmq_disconnect @ptr, endpoint
  ZMQ.error_check true if rc==-1
end

#get_opt(option) ⇒ Object

Get a socket option



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/0mq/socket.rb', line 173

def get_opt(option)
  type = @@option_types.fetch(option) \
    { raise ArgumentError, "Unknown option: #{option}" }
  
  value, size = get_opt_pointers type
  
  rc = LibZMQ.zmq_getsockopt @ptr, option, value, size
  ZMQ.error_check true if rc==-1
  
  if type == :string
    value.read_string(size.read_int-1)
  elsif type == :bool
    value.read_int == 1
  else
    value.send :"read_#{type}"
  end
end

#recv_arrayObject

Receive a multipart message as an array of strings



112
113
114
115
116
117
118
119
# File 'lib/0mq/socket.rb', line 112

def recv_array
  [].tap do |ary|
    loop do
      ary << recv_string
      break unless get_opt(ZMQ::RCVMORE)
    end
  end
end

#recv_string(flags = 0) ⇒ Object

Receive a string from the socket



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/0mq/socket.rb', line 87

def recv_string(flags = 0)
  rc = LibZMQ.zmq_msg_init @msgptr
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_recvmsg @ptr, @msgptr, flags
  ZMQ.error_check true if rc==-1
  
  str = LibZMQ.zmq_msg_data(@msgptr)
              .read_string(LibZMQ.zmq_msg_size(@msgptr))
  
  rc = LibZMQ.zmq_msg_close @msgptr
  ZMQ.error_check true if rc==-1
  
  str
end

#recv_with_routingObject

Receive a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part is not included in the resulting arrays.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/0mq/socket.rb', line 133

def recv_with_routing
  [[],[]].tap do |routing, body|
    loop do
      nxt = recv_string
      break if nxt.empty?
      routing << nxt
      raise ArgumentError, "Expected empty routing delimiter in "\
                           "multipart message: #{routing}" \
                            unless get_opt ZMQ::RCVMORE
    end
    loop do
      body << recv_string
      break unless get_opt(ZMQ::RCVMORE)
    end
  end
end

#send_array(ary) ⇒ Object

Send a multipart message as an array of strings



104
105
106
107
108
109
# File 'lib/0mq/socket.rb', line 104

def send_array(ary)
  last = ary.last
  
  ary[0...-1].each { |str| send_string str, ZMQ::SNDMORE }
  send_string last
end

#send_string(string, flags = 0) ⇒ Object

Send a string to the socket



71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/0mq/socket.rb', line 71

def send_string(string, flags = 0)
  size = string.respond_to?(:bytesize) ? string.bytesize : string.size
  @msgbuf = LibC.malloc size
  @msgbuf.write_string string, size
  
  rc = LibZMQ.zmq_msg_init_data @msgptr, @msgbuf, size, LibC::Free, nil
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_sendmsg @ptr, @msgptr, flags
  ZMQ.error_check true if rc==-1
  
  rc = LibZMQ.zmq_msg_close @msgptr
  ZMQ.error_check true if rc==-1
end

#send_with_routing(routing, body) ⇒ Object

Send a multipart message as routing array and a body array All parts before an empty part are considered routing parts, and all parta after the empty part are considered body parts. The empty delimiter part should not be included in the input arrays.



125
126
127
# File 'lib/0mq/socket.rb', line 125

def send_with_routing(routing, body)
  send_array [*routing, '', *body]
end

#set_opt(option, value) ⇒ Object

Set a socket option



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/0mq/socket.rb', line 151

def set_opt(option, value)
  type = @@option_types.fetch(option) \
    { raise ArgumentError, "Unknown option: #{option}" }
  
  unless type == :string
    if type == :bool
      valptr = FFI::MemoryPointer.new(:int)
      valptr.write_int(value ? 1 : 0)
    else
      valptr = FFI::MemoryPointer.new(type)
      valptr.send :"write_#{type}", value
    end
    value = valptr
  end
  
  rc = LibZMQ.zmq_setsockopt @ptr, option, value, value.size
  ZMQ.error_check true if rc==-1
  
  value
end

#type_symObject

Get the socket type name as a symbol



42
43
44
# File 'lib/0mq/socket.rb', line 42

def type_sym
  ZMQ::SocketTypeNameMap[type].to_sym
end

#unbind(endpoint) ⇒ Object

Unbind from an endpoint



59
60
61
62
# File 'lib/0mq/socket.rb', line 59

def unbind(endpoint)
  rc = LibZMQ.zmq_unbind @ptr, endpoint
  ZMQ.error_check true if rc==-1
end