Class: EventMachine::ZMQ::Connection

Inherits:
Connection
  • Object
show all
Defined in:
lib/eventmachine/zmq/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#socketObject (readonly)

Returns the value of attribute socket.



4
5
6
# File 'lib/eventmachine/zmq/connection.rb', line 4

def socket
  @socket
end

Instance Method Details

#bind(addr) ⇒ Object



6
7
8
# File 'lib/eventmachine/zmq/connection.rb', line 6

def bind addr
  socket.bind addr
end

#connect(addr) ⇒ Object



10
11
12
# File 'lib/eventmachine/zmq/connection.rb', line 10

def connect addr
  socket.bind addr
end

#notify_readableObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/eventmachine/zmq/connection.rb', line 34

def notify_readable
  return unless readable?

  loop do
    if msg = get_message
      receive_data msg.copy_out_string
      while socket.more_parts?
        if msg = get_message
          receive_data msg.copy_out_string
        else
          break
        end
      end
    else
      break
    end
  end
end

#notify_writableObject



53
54
55
# File 'lib/eventmachine/zmq/connection.rb', line 53

def notify_writable
  self.notify_writable = false
end

#readable?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/eventmachine/zmq/connection.rb', line 30

def readable?
  (socket.getsockopt(::ZMQ::EVENTS) & ::ZMQ::POLLIN) == ::ZMQ::POLLIN
end

#send_data(data, more = false) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/eventmachine/zmq/connection.rb', line 22

def send_data(data, more=false)
  sndmore = more ? ::ZMQ::SNDMORE : 0

  success = socket.send_string(data.to_s, ::ZMQ::NOBLOCK | sndmore)
  self.notify_writable = true unless success
  success
end

#subscribe(channel) ⇒ Object



14
15
16
# File 'lib/eventmachine/zmq/connection.rb', line 14

def subscribe channel
  socket.setsockopt ZMQ::SUBSCRIBE, channel
end

#unsubscribe(channel) ⇒ Object



18
19
20
# File 'lib/eventmachine/zmq/connection.rb', line 18

def unsubscribe channel
  socket.setsockopt ZMQ::UNSUBSCRIBE, channel
end