Class: EventMachine::Protocols::Zmq2::InProc::Connection
- Inherits:
-
Object
- Object
- EventMachine::Protocols::Zmq2::InProc::Connection
show all
- Includes:
- ConnectionMixin
- Defined in:
- lib/em/protocols/zmq2/inproc.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
#not_too_busy?, #post_init, #sent_data
Constructor Details
#initialize(socket) ⇒ Connection
Returns a new instance of Connection.
50
51
52
53
54
55
56
57
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 50
def initialize(socket)
@socket = socket
@peer = nil
@peer_waiting = false
@outgoing_queue = []
@recursion = 0
@state = :connecting
end
|
Instance Attribute Details
#notify_when_free ⇒ Object
Returns the value of attribute notify_when_free.
49
50
51
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 49
def notify_when_free
@notify_when_free
end
|
#peer ⇒ Object
Returns the value of attribute peer.
48
49
50
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 48
def peer
@peer
end
|
#peer_waiting ⇒ Object
Returns the value of attribute peer_waiting.
49
50
51
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 49
def peer_waiting
@peer_waiting
end
|
Instance Method Details
#_not_too_busy? ⇒ Boolean
67
68
69
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 67
def _not_too_busy?
@state == :connected && @outgoing_queue.size <= 32
end
|
#call ⇒ Object
77
78
79
80
81
82
83
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 77
def call
if message = @peer.shift_outgoing_queue
receive_strings(message)
else
@peer.peer_waiting!
end
end
|
#close_connection(after_writting = false) ⇒ Object
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 121
def close_connection(after_writting = false)
@peer.close_connection(:peer) unless after_writting == :peer
unless after_writting == true
@outgoing_queue.clear
end
if @peer_waiting
EM.next_tick @peer
end
@state = :closing
end
|
#error? ⇒ Boolean
140
141
142
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 140
def error?
false
end
|
#peer_waiting! ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 90
def peer_waiting!
unless @outgoing_queue.empty?
@peer_waiting = false
if (@recursion += 1) == 10
EM.next_tick @peer
else
@peer.call
end
else
unbind if @state == :closing
@peer_waiting = true
end
end
|
#receive_strings(message) ⇒ Object
85
86
87
88
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 85
def receive_strings(message)
super
@peer.peer_waiting!
end
|
#send_strings(strings) ⇒ Object
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 104
def send_strings(strings)
if @state == :connected
unless Array === strings
strings = Array(strings)
end
@outgoing_queue << strings
end
if @peer_waiting
@peer_waiting = false
EM.next_tick @peer
end
end
|
#send_strings_or_prepared(strings, prepared) ⇒ Object
117
118
119
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 117
def send_strings_or_prepared(strings, prepared)
send_strings(strings)
end
|
#shift_outgoing_queue ⇒ Object
71
72
73
74
75
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 71
def shift_outgoing_queue
message = @outgoing_queue.shift
EM.next_tick { sent_data } if @notify_when_free && _not_too_busy?
message
end
|
#unbind ⇒ Object
132
133
134
135
136
137
138
|
# File 'lib/em/protocols/zmq2/inproc.rb', line 132
def unbind
old, @state = @state, :closed
if old == :closing
@peer.peer_waiting!
end
super
end
|