Class: Kymera::SSocket
- Inherits:
-
Object
- Object
- Kymera::SSocket
- Defined in:
- lib/kymera/szmq/szmq.rb
Instance Method Summary collapse
- #bind(address = @address) ⇒ Object
- #close ⇒ Object
- #connect(address = @address) ⇒ Object
-
#initialize(address, type) ⇒ SSocket
constructor
A new instance of SSocket.
- #publish_message(channel, message) ⇒ Object
-
#receive(&block) ⇒ Object
This method listens for messages coming in and then processes them will the block passed into the method.
- #send_message(message) ⇒ Object
- #subscribe(channels, &block) ⇒ Object
Constructor Details
#initialize(address, type) ⇒ SSocket
Returns a new instance of SSocket.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/kymera/szmq/szmq.rb', line 82 def initialize(address, type) @socket_types = %w(request reply dealer router pub sub push pull xpub xsub) @context = SZMQ.context @address = address @socket_type_string = type @socket_type = get_socket_type(type) if @socket_types.include?(type.downcase) @socket = @context.socket(@socket_type) #for some reason if the socket is a push socket the linger option is causing the message not to get sent @socket.setsockopt(ZMQ::LINGER, 0) unless @socket_type_string == 'push' else raise "#{type} is not a valid socket type" end end |
Instance Method Details
#bind(address = @address) ⇒ Object
97 98 99 100 101 102 |
# File 'lib/kymera/szmq/szmq.rb', line 97 def bind(address = @address) if address.nil? raise "An address must be set or passed" end error_check(@socket.bind(address)) end |
#close ⇒ Object
144 145 146 |
# File 'lib/kymera/szmq/szmq.rb', line 144 def close error_check(@socket.close) end |
#connect(address = @address) ⇒ Object
104 105 106 107 108 109 |
# File 'lib/kymera/szmq/szmq.rb', line 104 def connect(address = @address) if address.nil? raise "An address must be set or passed" end error_check(@socket.connect(address)) end |
#publish_message(channel, message) ⇒ Object
138 139 140 141 142 |
# File 'lib/kymera/szmq/szmq.rb', line 138 def (channel, ) raise 'this socket is not of type PUB and cannot publish a message' unless @socket_type_string == 'pub' @socket.send_string(channel, ZMQ::SNDMORE) @socket.send_string() end |
#receive(&block) ⇒ Object
This method listens for messages coming in and then processes them will the block passed into the method. If no block is passed, messages will be received but will then be dropped on the floor. If the socket is of type REP and no block is given, the receive method will reply with “0” indicating that the message was received currently, the result of the block is sent back as a reply for REP sockets. This may change later TODO - Currently, the send_string method is causing the interupt to be delayed until the next message is received. need to find a way to fix this TODO - add support for SUB sockets
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/kymera/szmq/szmq.rb', line 170 def receive(&block) trap ("INT") do puts "Received interrupt..." @close = true end = '' if @socket_type == ZMQ::PULL loop do break if @close @socket.recv_string() if block_given? yield() end end elsif @socket_type == ZMQ::REP = '' loop do break if @close unless @socket.recv_string() == -1 @socket.recv_string() if block_given? = yield() else = "0" end end @socket.send_string() end else raise "Socket type of #{@socket_type_string} does not receive messages" end end |
#send_message(message) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/kymera/szmq/szmq.rb', line 148 def () trap ("INT") do puts "Received interrupt..." @socket.close end if @socket_type == ZMQ::REQ @socket.send_string() reply = '' @socket.recv_string(reply) reply #end else @socket.send_string() nil end end |
#subscribe(channels, &block) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/kymera/szmq/szmq.rb', line 111 def subscribe(channels, &block) raise "This socket is not of type SUB and cannot subscribe to a channel" unless @socket_type_string == 'sub' if channels.is_a? String #Debug code #puts "Subscribing to #{channels}" error_check(@socket.setsockopt(ZMQ::SUBSCRIBE, channels)) elsif channels.is_a? Array channels.each do |channel| #debug code #puts "Subscribing to #{channel}" error_check(@socket.setsockopt(ZMQ::SUBSCRIBE, channel)) end end connect channel = '' = '' loop do @socket.recv_string(channel) @socket.recv_string() if block_given? yield(channel, ) else [channel, ] end end end |