Class: EventMachine::ZeroMQ::Socket
- Inherits:
-
Connection
- Object
- Connection
- EventMachine::ZeroMQ::Socket
- Defined in:
- lib/em-zeromq/socket.rb
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#on_readable ⇒ Object
Returns the value of attribute on_readable.
-
#on_writable ⇒ Object
Returns the value of attribute on_writable.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#socket_type ⇒ Object
readonly
Returns the value of attribute socket_type.
Class Method Summary collapse
Instance Method Summary collapse
-
#bind(address) ⇒ Object
User method.
- #connect(address) ⇒ Object
- #getsockopt(opt) ⇒ Object
-
#initialize(socket, socket_type, handler) ⇒ Socket
constructor
A new instance of Socket.
- #notify_readable ⇒ Object
- #notify_writable ⇒ Object
- #readable? ⇒ Boolean
-
#register_readable ⇒ Object
Make this socket available for reads.
-
#register_writable ⇒ Object
Trigger on_readable when socket is readable.
-
#send_msg(*parts) ⇒ Object
send a non blocking message parts: if only one argument is given a signle part message is sent if more than one arguments is given a multipart message is sent.
- #setsockopt(opt, value) ⇒ Object
- #subscribe(what = '') ⇒ Object
-
#unbind ⇒ Object
cleanup when ending loop.
- #unsubscribe(what) ⇒ Object
- #writable? ⇒ Boolean
Constructor Details
#initialize(socket, socket_type, handler) ⇒ Socket
Returns a new instance of Socket.
7 8 9 10 11 |
# File 'lib/em-zeromq/socket.rb', line 7 def initialize(socket, socket_type, handler) @socket = socket @socket_type = socket_type @handler = handler end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
4 5 6 |
# File 'lib/em-zeromq/socket.rb', line 4 def handler @handler end |
#on_readable ⇒ Object
Returns the value of attribute on_readable.
4 5 6 |
# File 'lib/em-zeromq/socket.rb', line 4 def on_readable @on_readable end |
#on_writable ⇒ Object
Returns the value of attribute on_writable.
4 5 6 |
# File 'lib/em-zeromq/socket.rb', line 4 def on_writable @on_writable end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
5 6 7 |
# File 'lib/em-zeromq/socket.rb', line 5 def socket @socket end |
#socket_type ⇒ Object (readonly)
Returns the value of attribute socket_type.
5 6 7 |
# File 'lib/em-zeromq/socket.rb', line 5 def socket_type @socket_type end |
Class Method Details
.map_sockopt(opt, name) ⇒ Object
13 14 15 16 |
# File 'lib/em-zeromq/socket.rb', line 13 def self.map_sockopt(opt, name) define_method(name){ getsockopt(opt) } define_method("#{name}="){|val| @socket.setsockopt(opt, val) } end |
Instance Method Details
#bind(address) ⇒ Object
User method
31 32 33 |
# File 'lib/em-zeromq/socket.rb', line 31 def bind(address) @socket.bind(address) end |
#connect(address) ⇒ Object
35 36 37 |
# File 'lib/em-zeromq/socket.rb', line 35 def connect(address) @socket.connect(address) end |
#getsockopt(opt) ⇒ Object
86 87 88 89 90 91 92 93 94 |
# File 'lib/em-zeromq/socket.rb', line 86 def getsockopt(opt) ret = [] rc = @socket.getsockopt(opt, ret) unless ZMQ::Util.resultcode_ok?(rc) raise ZMQOperationFailed, "getsockopt: #{ZMQ::Util.error_string}" end (ret.size == 1) ? ret[0] : ret end |
#notify_readable ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/em-zeromq/socket.rb', line 121 def notify_readable # Not sure if this is actually necessary. I suppose it prevents us # from having to to instantiate a ZMQ::Message unnecessarily. # I'm leaving this is because its in the docs, but it could probably # be taken out. return unless readable? loop do msg_parts = [] msg = if msg msg_parts << msg while @socket.more_parts? msg = if msg msg_parts << msg else raise "Multi-part message missing a message!" end end @handler.on_readable(self, msg_parts) else break end end end |
#notify_writable ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/em-zeromq/socket.rb', line 149 def notify_writable return unless writable? # one a writable event is successfully received the socket # should be accepting messages again so stop triggering # write events self.notify_writable = false if @handler.respond_to?(:on_writable) @handler.on_writable(self) end end |
#readable? ⇒ Boolean
161 162 163 |
# File 'lib/em-zeromq/socket.rb', line 161 def readable? (getsockopt(ZMQ::EVENTS) & ZMQ::POLLIN) == ZMQ::POLLIN end |
#register_readable ⇒ Object
Make this socket available for reads
106 107 108 109 110 111 112 113 |
# File 'lib/em-zeromq/socket.rb', line 106 def register_readable # Since ZMQ is event triggered I think this is necessary if readable? notify_readable end # Subscribe to EM read notifications self.notify_readable = true end |
#register_writable ⇒ Object
Trigger on_readable when socket is readable
116 117 118 119 |
# File 'lib/em-zeromq/socket.rb', line 116 def register_writable # Subscribe to EM write notifications self.notify_writable = true end |
#send_msg(*parts) ⇒ Object
send a non blocking message parts: if only one argument is given a signle part message is sent
if more than one arguments is given a multipart message is sent
return: true is message was queued, false otherwise
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/em-zeromq/socket.rb', line 55 def send_msg(*parts) parts = Array(parts[0]) if parts.size == 0 sent = true # multipart parts[0...-1].each do |msg| sent = @socket.send_string(msg, ZMQ::NOBLOCK | ZMQ::SNDMORE) if sent == false break end end if sent # all the previous parts were queued, send # the last one ret = @socket.send_string(parts[-1], ZMQ::NOBLOCK) if ret < 0 raise "Unable to send message: #{ZMQ::Util.error_string}" end else # error while sending the previous parts # register the socket for writability self.notify_writable = true sent = false end EM::next_tick{ notify_readable() } sent end |
#setsockopt(opt, value) ⇒ Object
96 97 98 |
# File 'lib/em-zeromq/socket.rb', line 96 def setsockopt(opt, value) @socket.setsockopt(opt, value) end |
#subscribe(what = '') ⇒ Object
39 40 41 42 |
# File 'lib/em-zeromq/socket.rb', line 39 def subscribe(what = '') raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB' @socket.setsockopt(ZMQ::SUBSCRIBE, what) end |
#unbind ⇒ Object
cleanup when ending loop
101 102 103 |
# File 'lib/em-zeromq/socket.rb', line 101 def unbind detach_and_close end |
#unsubscribe(what) ⇒ Object
44 45 46 47 |
# File 'lib/em-zeromq/socket.rb', line 44 def unsubscribe(what) raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB' @socket.setsockopt(ZMQ::UNSUBSCRIBE, what) end |
#writable? ⇒ Boolean
165 166 167 168 169 |
# File 'lib/em-zeromq/socket.rb', line 165 def writable? return true # ZMQ::EVENTS has issues in ZMQ HEAD, we'll ignore this till they're fixed # (getsockopt(ZMQ::EVENTS) & ZMQ::POLLOUT) == ZMQ::POLLOUT end |