Class: Baykit::BayServer::Agent::Multiplexer::SpiderMultiplexer
Defined Under Namespace
Classes: ChannelOperation
Instance Attribute Summary collapse
#agent, #channel_count, #lock, #rudders, #rudders_lock
Instance Method Summary
collapse
#add_rudder_state, #close_all, #close_timeout_sockets, #consume_oldest_unit, #find_rudder_state_by_key, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state
#add_rudder_state, #consume_oldest_unit, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state
Constructor Details
Returns a new instance of SpiderMultiplexer.
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 42
def initialize(agt, anchorable)
super(agt)
@anchorable = anchorable
@operations = []
@operations_lock = Mutex.new
@selector = Selector.new
@select_wakeup_pipe = IO.pipe
@selector.register(@select_wakeup_pipe[0], Selector::OP_READ)
@agent.add_timer_handler(self)
@handshaked = false
end
|
Instance Attribute Details
#anchorable ⇒ Object
Returns the value of attribute anchorable.
35
36
37
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 35
def anchorable
@anchorable
end
|
#handshaked ⇒ Object
Returns the value of attribute handshaked.
40
41
42
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 40
def handshaked
@handshaked
end
|
#operations ⇒ Object
Returns the value of attribute operations.
37
38
39
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 37
def operations
@operations
end
|
#operations_lock ⇒ Object
Returns the value of attribute operations_lock.
38
39
40
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 38
def operations_lock
@operations_lock
end
|
#select_wakeup_pipe ⇒ Object
Returns the value of attribute select_wakeup_pipe.
39
40
41
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 39
def select_wakeup_pipe
@select_wakeup_pipe
end
|
#selector ⇒ Object
Returns the value of attribute selector.
36
37
38
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 36
def selector
@selector
end
|
Instance Method Details
#cancel_read(st) ⇒ Object
155
156
157
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 155
def cancel_read(st)
@selector.unregister(st.rudder.io)
end
|
#cancel_write(st) ⇒ Object
159
160
161
162
163
164
165
166
167
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 159
def cancel_write(st)
op = @selector.get_op(st.rudder.io) & ~Selector::OP_WRITE
if op != Selector::OP_READ
@selector.unregister(st.rudder.io)
else
@selector.modify(st.rudder.io, op)
end
end
|
#close_rudder(st) ⇒ Object
178
179
180
181
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 178
def close_rudder(st)
@selector.unregister(st.rudder.io)
super
end
|
#is_non_blocking ⇒ Object
146
147
148
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 146
def is_non_blocking
return true
end
|
#next_accept(st) ⇒ Object
169
170
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 169
def next_accept(st)
end
|
#next_read(st) ⇒ Object
172
173
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 172
def next_read(st)
end
|
#next_write(st) ⇒ Object
175
176
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 175
def next_write(st)
end
|
#on_busy ⇒ Object
184
185
186
187
188
189
190
191
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 184
def on_busy
BayLog.debug("%s onBusy", agent)
BayServer::anchorable_port_map.keys.each do |rd|
@selector.unregister(rd.io)
st = get_rudder_state(rd)
st.accepting = false
end
end
|
#on_free ⇒ Object
193
194
195
196
197
198
199
200
201
202
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 193
def on_free
BayLog.debug("%s onFree aborted=%s", agent, agent.aborted);
if agent.aborted
return
end
BayServer.anchorable_port_map.keys.each do |rd|
req_accept(rd)
end
end
|
#on_timer ⇒ Object
208
209
210
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 208
def on_timer
close_timeout_sockets
end
|
#receive(wait) ⇒ Object
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 218
def receive(wait)
if not wait
selected_map = @selector.select()
else
selected_map = @selector.select(GrandAgent::SELECT_TIMEOUT_SEC)
end
register_channel_ops
selected_map.keys.each do |io|
if io == @select_wakeup_pipe[0]
on_waked_up
else
handle_channel(io, selected_map[io])
end
end
return !selected_map.empty?
end
|
#req_accept(rd) ⇒ Object
63
64
65
66
67
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 63
def req_accept(rd)
st = get_rudder_state(rd)
selector.register(rd.io, Selector::OP_READ)
st.accepting = true
end
|
#req_close(rd) ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 126
def req_close(rd)
st = get_rudder_state(rd)
BayLog.debug("%s reqClose rd=%s", @agent, rd);
if st == nil
BayLog.warn("%s channel state not found: %s", @agent, rd)
return
end
close_rudder(st)
@agent.send_closed_letter(st, false)
st.access
end
|
#req_connect(rd, adr) ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 69
def req_connect(rd, adr)
st = get_rudder_state(rd)
BayLog.debug("%s reqConnect adr=%s rd=%s chState=%s", @agent, adr.canonname, rd, st)
rd.set_non_blocking
begin
rd.io.connect(adr)
rescue IO::WaitWritable => e
end
st.connecting = true
add_operation(rd, Selector::OP_WRITE, true)
end
|
#req_end(rd) ⇒ Object
116
117
118
119
120
121
122
123
124
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 116
def req_end(rd)
st = get_rudder_state(rd)
if st == nil
return
end
st.end
st.access
end
|
#req_read(rd) ⇒ Object
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 85
def req_read(rd)
st = get_rudder_state(rd)
BayLog.debug("%s reqRead st=%s", @agent, st);
add_operation(rd, Selector::OP_READ)
if st != nil
st.access
end
end
|
#req_write(rd, buf, adr, tag, &lis) ⇒ Object
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 96
def req_write(rd, buf,adr, tag, &lis)
st = get_rudder_state(rd)
BayLog.debug("%s req write st=%s tag=%s", @agent, st, tag)
if st == nil || st.closed
BayLog.warn("%s Channel is closed: %s", @agent, rd)
lis.call()
return
end
unt = WriteUnit.new(buf, adr, tag, &lis)
st.write_queue_lock.synchronize do
st.write_queue << unt
end
add_operation(rd, Selector::OP_WRITE)
st.access
end
|
#shutdown ⇒ Object
142
143
144
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 142
def shutdown
wakeup
end
|
#to_s ⇒ Object
54
55
56
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 54
def to_s
return "SpdMpx[" + @agent.to_s + "]"
end
|
#use_async_api ⇒ Object
150
151
152
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 150
def use_async_api
return false
end
|
#wakeup ⇒ Object
243
244
245
|
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 243
def wakeup
IOUtil.write_int32(@select_wakeup_pipe[1], 0)
end
|