Class: Baykit::BayServer::Agent::Multiplexer::SpinMultiplexer

Inherits:
MultiplexerBase
  • Object
show all
Includes:
TimerHandler, Rudders, Util
Defined in:
lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb

Defined Under Namespace

Classes: Lapper, ReadIOLapper

Instance Attribute Summary collapse

Attributes inherited from MultiplexerBase

#agent, #channel_count, #lock, #rudders, #rudders_lock

Instance Method Summary collapse

Methods inherited from MultiplexerBase

#add_rudder_state, #close_all, #close_timeout_sockets, #consume_oldest_unit, #find_rudder_state_by_key, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state

Methods included from Common::Multiplexer

#add_rudder_state, #consume_oldest_unit, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state

Constructor Details

#initialize(agt) ⇒ SpinMultiplexer

Returns a new instance of SpinMultiplexer.



100
101
102
103
104
105
106
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 100

def initialize(agt)
  super(agt)
  @spin_count = 0
  @running_list = []
  @running_list_lock = Mutex.new
  @agent.add_timer_handler(self)
end

Instance Attribute Details

#running_listObject (readonly)

Returns the value of attribute running_list.



97
98
99
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 97

def running_list
  @running_list
end

#running_list_lockObject (readonly)

Returns the value of attribute running_list_lock.



98
99
100
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 98

def running_list_lock
  @running_list_lock
end

#spin_countObject (readonly)

Returns the value of attribute spin_count.



96
97
98
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 96

def spin_count
  @spin_count
end

Instance Method Details

#cancel_read(st) ⇒ Object



194
195
196
197
198
199
200
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 194

def cancel_read(st)
  st.reading_lock.synchronize do
    BayLog.debug("%s Reading off %s", agent, st.rudder)
    st.reading = false
  end
  remove_from_running_list(st)
end

#cancel_write(st) ⇒ Object



202
203
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 202

def cancel_write(st)
end

#close_rudder(st) ⇒ Object



236
237
238
239
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 236

def close_rudder(st)
  remove_from_running_list(st)
  super
end

#is_emptyObject

Custom methods



252
253
254
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 252

def is_empty
  return @running_list.empty?
end

#is_non_blockingObject



185
186
187
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 185

def is_non_blocking
  return false
end

#next_accept(st) ⇒ Object



205
206
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 205

def next_accept(st)
end

#next_read(st) ⇒ Object



208
209
210
211
212
213
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 208

def next_read(st)
  lpr = ReadIOLapper.new(@agent, st)
  lpr.next

  add_to_running_list(lpr)
end

#next_write(st) ⇒ Object



215
216
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 215

def next_write(st)
end

#on_busyObject



218
219
220
221
222
223
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 218

def on_busy
  BayLog.debug("%s onBusy", agent)
  BayServer::anchorable_port_map.keys.each do |rd|

  end
end

#on_freeObject



225
226
227
228
229
230
231
232
233
234
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 225

def on_free
  BayLog.debug("%s onFree aborted=%s", agent, agent.aborted);
  if agent.aborted
    return
  end

  BayServer.anchorable_port_map.keys.each do |rd|

  end
end

#on_timerObject

Implements TimerHandler



245
246
247
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 245

def on_timer
  #stop_timeout_spins
end

#process_dataObject



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 256

def process_data
  if is_empty
    return false
  end

  all_spun = true
  remove_list = []
  @running_list.length.downto(1) do |i|
    lpr = @running_list[i-1]
    st = lpr.state
    spun = lpr.lap
    st.access

    all_spun = all_spun & spun
  end

  if all_spun
    @spin_count += 1
    if @spin_count > 10
      sleep(0.01)
    else
      @spin_count = 0
    end
  end

  return true

end

#req_accept(rd) ⇒ Object

Implements Multiplexer

Raises:

  • (NotImplementedError)


115
116
117
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 115

def req_accept(rd)
  raise NotImplementedError.new
end

#req_close(rd) ⇒ Object



174
175
176
177
178
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 174

def req_close(rd)
  st = get_rudder_state(rd)
  close_rudder(st)
  @agent.send_closed_letter(st, false)
end

#req_connect(rd, adr) ⇒ Object

Raises:

  • (NotImplementedError)


119
120
121
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 119

def req_connect(rd, adr)
  raise NotImplementedError.new
end

#req_end(rd) ⇒ Object



169
170
171
172
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 169

def req_end(rd)
  st = get_rudder_state(rd)
  st.finale = true
end

#req_read(rd) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 123

def req_read(rd)
  st = get_rudder_state(rd)
  if st == nil
    BayLog.error("%s Invalid rudder", self)
    return
  end

  need_read = false
  st.reading_lock.synchronize do
    if not st.reading
      need_read = true
      st.reading = true
    end
  end

  if need_read
    next_read(st)
  end
end

#req_write(rd, buf, len, adr, tag, &lis) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 143

def req_write(rd, buf, len, adr, tag, &lis)
  st = get_rudder_state(rd)
  if st == nil
    BayLog.warn("Invalid rudder")
    lis.call()
  end

  unt = WriteUnit.new(buf, adr, tag, &lis)
  st.write_queue_lock.synchronize do
    st.write_queue << unt
  end
  st.access

  need_write = false
  st.writing_lock.synchronize do
    if not st.writing
      need_write = true
      st.writing = true
    end
  end

  if need_write
    next_write(st)
  end
end

#shutdownObject



181
182
183
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 181

def shutdown
  wakeup
end

#to_sObject



107
108
109
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 107

def to_s
  return "SpnMpx[#{@agent}]"
end

#use_async_apiObject



189
190
191
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 189

def use_async_api
  return false
end