Class: EventMachine::Protocols::Zmq2::InProc::Connection

Inherits:
Object
  • Object
show all
Includes:
ConnectionMixin
Defined in:
lib/em/protocols/zmq2/inproc.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ConnectionMixin

#not_too_busy?, #post_init, #sent_data

Constructor Details

#initialize(socket) ⇒ Connection

Returns a new instance of Connection.



50
51
52
53
54
55
56
57
# File 'lib/em/protocols/zmq2/inproc.rb', line 50

def initialize(socket)
  @socket = socket
  @peer = nil
  @peer_waiting = false
  @outgoing_queue = []
  @recursion = 0
  @state = :connecting
end

Instance Attribute Details

#notify_when_freeObject

Returns the value of attribute notify_when_free.



49
50
51
# File 'lib/em/protocols/zmq2/inproc.rb', line 49

def notify_when_free
  @notify_when_free
end

#peerObject

Returns the value of attribute peer.



48
49
50
# File 'lib/em/protocols/zmq2/inproc.rb', line 48

def peer
  @peer
end

#peer_waitingObject

Returns the value of attribute peer_waiting.



49
50
51
# File 'lib/em/protocols/zmq2/inproc.rb', line 49

def peer_waiting
  @peer_waiting
end

Instance Method Details

#_not_too_busy?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/em/protocols/zmq2/inproc.rb', line 67

def _not_too_busy?
  @state == :connected && @outgoing_queue.size <= 32
end

#callObject



77
78
79
80
81
82
83
# File 'lib/em/protocols/zmq2/inproc.rb', line 77

def call
  if message = @peer.shift_outgoing_queue
    receive_strings(message)
  else
    @peer.peer_waiting!
  end
end

#close_connection(after_writting = false) ⇒ Object



121
122
123
124
125
126
127
128
129
130
# File 'lib/em/protocols/zmq2/inproc.rb', line 121

def close_connection(after_writting = false)
  @peer.close_connection(:peer)  unless after_writting == :peer
  unless after_writting == true
    @outgoing_queue.clear
  end
  if @peer_waiting
    EM.next_tick @peer
  end
  @state = :closing
end

#error?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/em/protocols/zmq2/inproc.rb', line 140

def error?
  false
end

#peer_waiting!Object



90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/em/protocols/zmq2/inproc.rb', line 90

def peer_waiting!
  unless @outgoing_queue.empty?
    @peer_waiting = false
    if (@recursion += 1) == 10
      EM.next_tick @peer
    else
      @peer.call
    end
  else
    unbind  if @state == :closing
    @peer_waiting = true
  end
end

#receive_strings(message) ⇒ Object



85
86
87
88
# File 'lib/em/protocols/zmq2/inproc.rb', line 85

def receive_strings(message)
  super
  @peer.peer_waiting!
end

#send_strings(strings) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/em/protocols/zmq2/inproc.rb', line 104

def send_strings(strings)
  if @state == :connected
    unless Array === strings
      strings = Array(strings)
    end
    @outgoing_queue << strings
  end
  if @peer_waiting
    @peer_waiting = false
    EM.next_tick @peer
  end
end

#send_strings_or_prepared(strings, prepared) ⇒ Object



117
118
119
# File 'lib/em/protocols/zmq2/inproc.rb', line 117

def send_strings_or_prepared(strings, prepared)
  send_strings(strings)
end

#shift_outgoing_queueObject



71
72
73
74
75
# File 'lib/em/protocols/zmq2/inproc.rb', line 71

def shift_outgoing_queue
  message = @outgoing_queue.shift
  EM.next_tick { sent_data }  if @notify_when_free && _not_too_busy?
  message
end

#unbindObject



132
133
134
135
136
137
138
# File 'lib/em/protocols/zmq2/inproc.rb', line 132

def unbind
  old, @state = @state, :closed
  if old == :closing
    @peer.peer_waiting!
  end
  super
end