Class: Baykit::BayServer::Agent::Multiplexer::SpiderMultiplexer

Inherits:
MultiplexerBase
  • Object
show all
Includes:
Baykit::BayServer::Agent::Multiplexer, TimerHandler, Common::Recipient, Rudders, Util
Defined in:
lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb

Defined Under Namespace

Classes: ChannelOperation

Instance Attribute Summary collapse

Attributes inherited from MultiplexerBase

#agent, #channel_count, #lock, #rudders, #rudders_lock

Instance Method Summary collapse

Methods inherited from MultiplexerBase

#add_rudder_state, #close_all, #close_timeout_sockets, #consume_oldest_unit, #find_rudder_state_by_key, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state

Methods included from Common::Multiplexer

#add_rudder_state, #consume_oldest_unit, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state

Constructor Details

#initialize(agt, anchorable) ⇒ SpiderMultiplexer

Returns a new instance of SpiderMultiplexer.



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 42

def initialize(agt, anchorable)
  super(agt)
  @anchorable = anchorable
  @operations = []
  @operations_lock = Mutex.new
  @selector = Selector.new
  @select_wakeup_pipe = IO.pipe
  @selector.register(@select_wakeup_pipe[0], Selector::OP_READ)

  @agent.add_timer_handler(self)
  @handshaked = false
end

Instance Attribute Details

#anchorableObject (readonly)

Returns the value of attribute anchorable.



35
36
37
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 35

def anchorable
  @anchorable
end

#handshakedObject (readonly)

Returns the value of attribute handshaked.



40
41
42
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 40

def handshaked
  @handshaked
end

#operationsObject (readonly)

Returns the value of attribute operations.



37
38
39
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 37

def operations
  @operations
end

#operations_lockObject (readonly)

Returns the value of attribute operations_lock.



38
39
40
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 38

def operations_lock
  @operations_lock
end

#select_wakeup_pipeObject (readonly)

Returns the value of attribute select_wakeup_pipe.



39
40
41
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 39

def select_wakeup_pipe
  @select_wakeup_pipe
end

#selectorObject (readonly)

Returns the value of attribute selector.



36
37
38
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 36

def selector
  @selector
end

Instance Method Details

#cancel_read(st) ⇒ Object



155
156
157
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 155

def cancel_read(st)
  @selector.unregister(st.rudder.io)
end

#cancel_write(st) ⇒ Object



159
160
161
162
163
164
165
166
167
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 159

def cancel_write(st)
  op = @selector.get_op(st.rudder.io) & ~Selector::OP_WRITE
  # Write OP off
  if op != Selector::OP_READ
    @selector.unregister(st.rudder.io)
  else
    @selector.modify(st.rudder.io, op)
  end
end

#close_rudder(st) ⇒ Object



178
179
180
181
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 178

def close_rudder(st)
  @selector.unregister(st.rudder.io)
  super
end

#is_non_blockingObject



146
147
148
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 146

def is_non_blocking
  return true
end

#next_accept(st) ⇒ Object



169
170
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 169

def next_accept(st)
end

#next_read(st) ⇒ Object



172
173
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 172

def next_read(st)
end

#next_write(st) ⇒ Object



175
176
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 175

def next_write(st)
end

#on_busyObject



184
185
186
187
188
189
190
191
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 184

def on_busy
  BayLog.debug("%s onBusy", agent)
  BayServer::anchorable_port_map.keys.each do |rd|
    @selector.unregister(rd.io)
    st = get_rudder_state(rd)
    st.accepting = false
  end
end

#on_freeObject



193
194
195
196
197
198
199
200
201
202
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 193

def on_free
  BayLog.debug("%s onFree aborted=%s", agent, agent.aborted);
  if agent.aborted
    return
  end

  BayServer.anchorable_port_map.keys.each do |rd|
    req_accept(rd)
  end
end

#on_timerObject

Implements TimerHandler



208
209
210
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 208

def on_timer
  close_timeout_sockets
end

#receive(wait) ⇒ Object

Receive letters



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 218

def receive(wait)
  if not wait
    selected_map = @selector.select()
  else
    selected_map = @selector.select(GrandAgent::SELECT_TIMEOUT_SEC)
  end
  #BayLog.debug("%s selected: %s", self, selected_map)

  register_channel_ops

  selected_map.keys.each do |io|
    if io == @select_wakeup_pipe[0]
      # Waked up by req_*
      on_waked_up
    else
      handle_channel(io, selected_map[io])
    end
  end

  return !selected_map.empty?
end

#req_accept(rd) ⇒ Object

Implements Multiplexer



63
64
65
66
67
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 63

def req_accept(rd)
  st = get_rudder_state(rd)
  selector.register(rd.io, Selector::OP_READ)
  st.accepting = true
end

#req_close(rd) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 126

def req_close(rd)
  st = get_rudder_state(rd)
  BayLog.debug("%s reqClose rd=%s", @agent, rd);

  if st == nil
    BayLog.warn("%s channel state not found: %s", @agent, rd)
    return
  end

  close_rudder(st)
  @agent.send_closed_letter(st, false)

  st.access
end

#req_connect(rd, adr) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 69

def req_connect(rd, adr)
  st = get_rudder_state(rd)
  BayLog.debug("%s reqConnect adr=%s rd=%s chState=%s", @agent, adr.canonname, rd, st)

  rd.set_non_blocking

  begin
    rd.io.connect(adr)
  rescue IO::WaitWritable => e
    #BayLog.error_e(e)
  end

  st.connecting = true
  add_operation(rd, Selector::OP_WRITE, true)
end

#req_end(rd) ⇒ Object



116
117
118
119
120
121
122
123
124
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 116

def req_end(rd)
  st = get_rudder_state(rd)
  if st == nil
    return
  end

  st.end
  st.access
end

#req_read(rd) ⇒ Object



85
86
87
88
89
90
91
92
93
94
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 85

def req_read(rd)
  st = get_rudder_state(rd)
  BayLog.debug("%s reqRead st=%s", @agent, st);

  add_operation(rd, Selector::OP_READ)

  if st != nil
    st.access
  end
end

#req_write(rd, buf, adr, tag, &lis) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 96

def req_write(rd, buf,adr, tag, &lis)
  st = get_rudder_state(rd)
  BayLog.debug("%s req write st=%s tag=%s", @agent, st, tag)

  if st == nil || st.closed
    BayLog.warn("%s Channel is closed: %s", @agent, rd)
    lis.call()
    return
  end

  unt = WriteUnit.new(buf, adr, tag, &lis)
  st.write_queue_lock.synchronize do
    st.write_queue << unt
  end

  add_operation(rd, Selector::OP_WRITE)

  st.access
end

#shutdownObject



142
143
144
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 142

def shutdown
  wakeup
end

#to_sObject



54
55
56
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 54

def to_s
  return "SpdMpx[" + @agent.to_s + "]"
end

#use_async_apiObject



150
151
152
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 150

def use_async_api
  return false
end

#wakeupObject

Wake up the recipient



243
244
245
# File 'lib/baykit/bayserver/agent/multiplexer/spider_multiplexer.rb', line 243

def wakeup
  IOUtil.write_int32(@select_wakeup_pipe[1], 0)
end