Class: Baykit::BayServer::Agent::Multiplexer::SpinMultiplexer

Inherits:
MultiplexerBase
  • Object
show all
Includes:
TimerHandler, Common, Rudders, Util
Defined in:
lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb

Defined Under Namespace

Classes: Lapper, ReadIOLapper

Instance Attribute Summary collapse

Attributes inherited from MultiplexerBase

#agent, #channel_count, #lock, #rudders, #rudders_lock

Instance Method Summary collapse

Methods inherited from MultiplexerBase

#add_rudder_state, #close_all, #close_timeout_sockets, #consume_oldest_unit, #find_rudder_state_by_key, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state

Methods included from Common::Multiplexer

#add_rudder_state, #consume_oldest_unit, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state

Constructor Details

#initialize(agt) ⇒ SpinMultiplexer

Returns a new instance of SpinMultiplexer.



104
105
106
107
108
109
110
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 104

def initialize(agt)
  super(agt)
  @spin_count = 0
  @running_list = []
  @running_list_lock = Mutex.new
  @agent.add_timer_handler(self)
end

Instance Attribute Details

#running_listObject (readonly)

Returns the value of attribute running_list.



101
102
103
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 101

def running_list
  @running_list
end

#running_list_lockObject (readonly)

Returns the value of attribute running_list_lock.



102
103
104
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 102

def running_list_lock
  @running_list_lock
end

#spin_countObject (readonly)

Returns the value of attribute spin_count.



100
101
102
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 100

def spin_count
  @spin_count
end

Instance Method Details

#cancel_read(st) ⇒ Object



199
200
201
202
203
204
205
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 199

def cancel_read(st)
  st.reading_lock.synchronize do
    BayLog.debug("%s Reading off %s", agent, st.rudder)
    st.reading = false
  end
  remove_from_running_list(st)
end

#cancel_write(st) ⇒ Object



207
208
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 207

def cancel_write(st)
end

#close_rudder(st) ⇒ Object



241
242
243
244
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 241

def close_rudder(st)
  remove_from_running_list(st)
  super
end

#is_emptyObject

Custom methods



257
258
259
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 257

def is_empty
  return @running_list.empty?
end

#is_non_blockingObject



190
191
192
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 190

def is_non_blocking
  return false
end

#next_accept(st) ⇒ Object



210
211
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 210

def next_accept(st)
end

#next_read(st) ⇒ Object



213
214
215
216
217
218
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 213

def next_read(st)
  lpr = ReadIOLapper.new(self, @agent, st)
  lpr.next

  add_to_running_list(lpr)
end

#next_write(st) ⇒ Object



220
221
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 220

def next_write(st)
end

#on_busyObject



223
224
225
226
227
228
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 223

def on_busy
  BayLog.debug("%s onBusy", agent)
  BayServer::anchorable_port_map.keys.each do |rd|

  end
end

#on_freeObject



230
231
232
233
234
235
236
237
238
239
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 230

def on_free
  BayLog.debug("%s onFree aborted=%s", agent, agent.aborted);
  if agent.aborted
    return
  end

  BayServer.anchorable_port_map.keys.each do |rd|

  end
end

#on_timerObject

Implements TimerHandler



250
251
252
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 250

def on_timer
  #stop_timeout_spins
end

#process_dataObject



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 261

def process_data
  if is_empty
    return false
  end

  all_spun = true
  remove_list = []
  @running_list.length.downto(1) do |i|
    lpr = @running_list[i-1]
    st = lpr.state
    spun = lpr.lap
    st.access

    all_spun = all_spun && spun
  end

  if all_spun
    @spin_count += 1
    if @spin_count > 10
      sleep(0.01)
    else
      @spin_count = 0
    end
  end

  return true

end

#req_accept(rd) ⇒ Object

Implements Multiplexer

Raises:

  • (NotImplementedError)


119
120
121
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 119

def req_accept(rd)
  raise NotImplementedError.new
end

#req_close(rd) ⇒ Object



178
179
180
181
182
183
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 178

def req_close(rd)
  st = get_rudder_state(rd)
  st.closing = true
  close_rudder(rd)
  @agent.send_closed_letter(st.id, st.rudder, self, false)
end

#req_connect(rd, adr) ⇒ Object

Raises:

  • (NotImplementedError)


123
124
125
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 123

def req_connect(rd, adr)
  raise NotImplementedError.new
end

#req_end(rd) ⇒ Object



173
174
175
176
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 173

def req_end(rd)
  st = get_rudder_state(rd)
  st.finale = true
end

#req_read(rd) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 127

def req_read(rd)
  st = get_rudder_state(rd)
  if st == nil
    BayLog.error("%s Invalid rudder", self)
    return
  end

  need_read = false
  st.reading_lock.synchronize do
    if not st.reading
      need_read = true
      st.reading = true
    end
  end

  if need_read
    next_read(st)
  end
end

#req_write(rd, buf, adr, tag, &lis) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 147

def req_write(rd, buf, adr, tag, &lis)
  st = get_rudder_state(rd)
  if st == nil
    BayLog.warn("Invalid rudder")
    lis.call()
  end

  unt = WriteUnit.new(buf, adr, tag, &lis)
  st.write_queue_lock.synchronize do
    st.write_queue << unt
  end
  st.access

  need_write = false
  st.writing_lock.synchronize do
    if not st.writing
      need_write = true
      st.writing = true
    end
  end

  if need_write
    next_write(st)
  end
end

#shutdownObject



186
187
188
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 186

def shutdown
  wakeup
end

#to_sObject



111
112
113
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 111

def to_s
  return "SpnMpx[#{@agent}]"
end

#use_async_apiObject



194
195
196
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 194

def use_async_api
  return false
end