Class: Kymera::SSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/kymera/szmq/szmq.rb

Instance Method Summary collapse

Constructor Details

#initialize(address, type) ⇒ SSocket

Returns a new instance of SSocket.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/kymera/szmq/szmq.rb', line 82

def initialize(address, type)
  @socket_types = %w(request reply dealer router pub sub push pull xpub xsub)
  @context = SZMQ.context
  @address = address
  @socket_type_string = type
  @socket_type = get_socket_type(type)
  if @socket_types.include?(type.downcase)
    @socket = @context.socket(@socket_type)
    #for some reason if the socket is a push socket the linger option is causing the message not to get sent
    @socket.setsockopt(ZMQ::LINGER, 0) unless @socket_type_string == 'push'
  else
    raise "#{type} is not a valid socket type"
  end
end

Instance Method Details

#bind(address = @address) ⇒ Object



97
98
99
100
101
102
# File 'lib/kymera/szmq/szmq.rb', line 97

def bind(address = @address)
  if address.nil?
    raise "An address must be set or passed"
  end
  error_check(@socket.bind(address))
end

#closeObject



144
145
146
# File 'lib/kymera/szmq/szmq.rb', line 144

def close
  error_check(@socket.close)
end

#connect(address = @address) ⇒ Object



104
105
106
107
108
109
# File 'lib/kymera/szmq/szmq.rb', line 104

def connect(address = @address)
  if address.nil?
    raise "An address must be set or passed"
  end
  error_check(@socket.connect(address))
end

#publish_message(channel, message) ⇒ Object



138
139
140
141
142
# File 'lib/kymera/szmq/szmq.rb', line 138

def publish_message(channel, message)
  raise 'this socket is not of type PUB and cannot publish a message' unless @socket_type_string == 'pub'
  @socket.send_string(channel, ZMQ::SNDMORE)
  @socket.send_string(message)
end

#receive(&block) ⇒ Object

This method listens for messages coming in and then processes them will the block passed into the method. If no block is passed, messages will be received but will then be dropped on the floor. If the socket is of type REP and no block is given, the receive method will reply with “0” indicating that the message was received currently, the result of the block is sent back as a reply for REP sockets. This may change later TODO - Currently, the send_string method is causing the interupt to be delayed until the next message is received. need to find a way to fix this TODO - add support for SUB sockets



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/kymera/szmq/szmq.rb', line 170

def receive(&block)

  trap ("INT") do
    puts "Received interrupt..."
    @close = true
  end
  received_message = ''

  if @socket_type == ZMQ::PULL
    loop do
      break if @close
      @socket.recv_string(received_message)
      if block_given?
        yield(received_message)
      end
    end

  elsif @socket_type == ZMQ::REP
    reply_message = ''
    loop do
      break if @close
      unless @socket.recv_string(received_message) == -1
        @socket.recv_string(received_message)
        if block_given?
          reply_message = yield(received_message)
        else
          reply_message = "0"
        end
      end
      @socket.send_string(reply_message)
    end
  else
    raise "Socket type of #{@socket_type_string} does not receive messages"
  end
end

#send_message(message) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/kymera/szmq/szmq.rb', line 148

def send_message(message)
  trap ("INT") do
    puts "Received interrupt..."
    @socket.close
  end
  if @socket_type == ZMQ::REQ
    @socket.send_string(message)
    reply = ''
    @socket.recv_string(reply)
    reply
    #end
  else
    @socket.send_string(message)
    nil
  end
end

#subscribe(channels, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/kymera/szmq/szmq.rb', line 111

def subscribe(channels, &block)
  raise "This socket is not of type SUB and cannot subscribe to a channel" unless @socket_type_string == 'sub'
  if channels.is_a? String
    #Debug code
    #puts "Subscribing to #{channels}"
    error_check(@socket.setsockopt(ZMQ::SUBSCRIBE, channels))
  elsif channels.is_a? Array
    channels.each do |channel|
      #debug code
      #puts "Subscribing to #{channel}"
      error_check(@socket.setsockopt(ZMQ::SUBSCRIBE, channel))
    end
  end
  connect
  channel = ''
  message = ''
  loop do
    @socket.recv_string(channel)
    @socket.recv_string(message)
    if block_given?
      yield(channel, message)
    else
      [channel, message]
    end
  end
end