Class: EventMachine::ZeroMQ::Socket
- Inherits:
-
Connection
- Object
- Connection
- EventMachine::ZeroMQ::Socket
show all
- Includes:
- EventEmitter
- Defined in:
- lib/em-zeromq/socket.rb
Constant Summary
collapse
- READABLES =
[ ZMQ::SUB, ZMQ::PULL, ZMQ::ROUTER, ZMQ::DEALER, ZMQ::REP, ZMQ::REQ, ZMQ::PAIR ]
- WRITABLES =
[ ZMQ::PUB, ZMQ::PUSH, ZMQ::ROUTER, ZMQ::DEALER, ZMQ::REP, ZMQ::REQ, ZMQ::PAIR ]
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#emit, #listeners, #on, #remove_all_listeners, #remove_listener
Constructor Details
#initialize(socket, socket_type) ⇒ Socket
Returns a new instance of Socket.
11
12
13
14
15
16
17
|
# File 'lib/em-zeromq/socket.rb', line 11
def initialize(socket, socket_type)
@socket = socket
@socket_type = socket_type
self.notify_readable = true if READABLES.include?(socket_type)
self.notify_writable = true if WRITABLES.include?(socket_type)
end
|
Instance Attribute Details
#socket ⇒ Object
Returns the value of attribute socket.
9
10
11
|
# File 'lib/em-zeromq/socket.rb', line 9
def socket
@socket
end
|
#socket_type ⇒ Object
Returns the value of attribute socket_type.
9
10
11
|
# File 'lib/em-zeromq/socket.rb', line 9
def socket_type
@socket_type
end
|
Class Method Details
.map_sockopt(opt, name) ⇒ Object
19
20
21
22
|
# File 'lib/em-zeromq/socket.rb', line 19
def self.map_sockopt(opt, name)
define_method(name){ getsockopt(opt) }
define_method("#{name}="){|val| setsockopt(opt, val) }
end
|
Instance Method Details
#bind(address) ⇒ Object
47
48
49
|
# File 'lib/em-zeromq/socket.rb', line 47
def bind(address)
@socket.bind(address)
end
|
#connect(address) ⇒ Object
51
52
53
|
# File 'lib/em-zeromq/socket.rb', line 51
def connect(address)
@socket.connect(address)
end
|
#disconnect(address) ⇒ Object
55
56
57
|
# File 'lib/em-zeromq/socket.rb', line 55
def disconnect(address)
@socket.disconnect(address)
end
|
#getsockopt(opt) ⇒ Object
106
107
108
109
110
111
112
113
114
|
# File 'lib/em-zeromq/socket.rb', line 106
def getsockopt(opt)
ret = []
rc = @socket.getsockopt(opt, ret)
unless ZMQ::Util.resultcode_ok?(rc)
raise ZMQOperationFailed, "getsockopt: #{ZMQ::Util.error_string}"
end
(ret.size == 1) ? ret[0] : ret
end
|
#hwm=(val) ⇒ Object
29
30
31
32
|
# File 'lib/em-zeromq/socket.rb', line 29
def hwm=(val)
self.sndhwm = val
self.rcvhwm = val
end
|
#notify_readable ⇒ Object
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/em-zeromq/socket.rb', line 125
def notify_readable
return unless readable?
while (message = get_message)
emit(:message, *message)
end
end
|
#notify_writable ⇒ Object
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/em-zeromq/socket.rb', line 137
def notify_writable
return unless writable?
self.notify_writable = false
emit(:writable)
end
|
#readable? ⇒ Boolean
147
148
149
|
# File 'lib/em-zeromq/socket.rb', line 147
def readable?
(getsockopt(ZMQ::EVENTS) & ZMQ::POLLIN) == ZMQ::POLLIN
end
|
#send_msg(*parts) ⇒ Object
send a non blocking message parts: if only one argument is given a signle part message is sent
if more than one arguments is given a multipart message is sent
return: true is message was queued, false otherwise
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/em-zeromq/socket.rb', line 75
def send_msg(*parts)
parts = Array(parts[0]) if parts.size == 0
sent = true
parts[0...-1].each do |msg|
ret = @socket.send_string(msg, ZMQ::NOBLOCK | ZMQ::SNDMORE)
if ret < 0
sent = false
break
end
end
if sent
ret = @socket.send_string(parts[-1], ZMQ::NOBLOCK)
if ret < 0
sent = false
end
else
self.notify_writable = true
end
EM::next_tick{ notify_readable() }
sent
end
|
#setsockopt(opt, value) ⇒ Object
116
117
118
|
# File 'lib/em-zeromq/socket.rb', line 116
def setsockopt(opt, value)
@socket.setsockopt(opt, value)
end
|
#subscribe(what = '') ⇒ Object
59
60
61
62
|
# File 'lib/em-zeromq/socket.rb', line 59
def subscribe(what = '')
raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
@socket.setsockopt(ZMQ::SUBSCRIBE, what)
end
|
#unbind ⇒ Object
120
121
122
123
|
# File 'lib/em-zeromq/socket.rb', line 120
def unbind
detach
@socket.close
end
|
#unsubscribe(what) ⇒ Object
64
65
66
67
|
# File 'lib/em-zeromq/socket.rb', line 64
def unsubscribe(what)
raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
@socket.setsockopt(ZMQ::UNSUBSCRIBE, what)
end
|
#writable? ⇒ Boolean
151
152
153
154
155
|
# File 'lib/em-zeromq/socket.rb', line 151
def writable?
return true
end
|