Class: Fairy::ChunkedFileBufferdQueue
- Inherits:
-
Object
- Object
- Fairy::ChunkedFileBufferdQueue
- Defined in:
- lib/fairy/share/port.rb
Instance Method Summary collapse
- #init_2ndmemory ⇒ Object
-
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ ChunkedFileBufferdQueue
constructor
A new instance of ChunkedFileBufferdQueue.
- #open_2ndmemory(&block) ⇒ Object
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
- #restore_2ndmemory ⇒ Object
- #store_2ndmemory(ary) ⇒ Object
Constructor Details
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ ChunkedFileBufferdQueue
Returns a new instance of ChunkedFileBufferdQueue.
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 |
# File 'lib/fairy/share/port.rb', line 1262 def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) @policy = policy @threshold = policy[:threshold] @threshold ||= CONF.FILEBUFFEREDQUEUE_THRESHOLD @push_queue = [] @push_queue_mutex = Mutex.new @buffers_queue = nil @buffers_queue_mon = queue_mon @buffers_queue_cv = queue_cv @pop_queue = nil end |
Instance Method Details
#init_2ndmemory ⇒ Object
1351 1352 1353 1354 1355 1356 |
# File 'lib/fairy/share/port.rb', line 1351 def init_2ndmemory @buffer_dir = @policy[:buffer_dir] @buffer_dir ||= CONF.TMP_DIR @buffers_queue = [] end |
#open_2ndmemory(&block) ⇒ Object
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 |
# File 'lib/fairy/share/port.rb', line 1358 def open_2ndmemory(&block) unless @buffers_queue init_2ndmemory end buffer = FastTempfile.open("port-buffer-", @buffer_dir) begin yield buffer.io ensure buffer.close end @buffers_queue.push buffer buffer end |
#pop ⇒ Object
1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 |
# File 'lib/fairy/share/port.rb', line 1314 def pop while !@pop_queue || @pop_queue.empty? @buffers_queue_mon.synchronize do if @buffers_queue @pop_queue = restore_2ndmemory else @buffers_queue_cv.wait end end end #e = @pop_queue.shift #@pop_queue = nil if @pop_queue.empty? #@e @pop_queue.shift end |
#pop_all ⇒ Object
1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 |
# File 'lib/fairy/share/port.rb', line 1330 def pop_all while !@pop_queue || @pop_queue.empty? @buffers_queue_mon.synchronize do if @buffers_queue if @buffers_queue.empty? @buffers_queue_cv.wait else @pop_queue = restore_2ndmemory end else @buffers_queue_cv.wait end end end #buf, @pop_queue = @pop_queue, nil #buf buf, @pop_queue = @pop_queue, [] buf end |
#push(e) ⇒ Object
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 |
# File 'lib/fairy/share/port.rb', line 1277 def push(e) @push_queue_mutex.synchronize do @push_queue.push e if @push_queue.size >= @threshold || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @buffers_queue_mon.synchronize do if @pop_queue store_2ndmemory(@push_queue) else @pop_queue = @push_queue end @push_queue = [] @buffers_queue_cv.broadcast end end end end |
#push_all(buf) ⇒ Object
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 |
# File 'lib/fairy/share/port.rb', line 1296 def push_all(buf) @push_queue_mutex.synchronize do @push_queue.concat buf if @push_queue.size > @threshold || @push_queue.last == :END_OF_STREAM @buffers_queue_mon.synchronize do if @pop_queue store_2ndmemory(@push_queue) else @pop_queue = @push_queue end @push_queue = [] @buffers_queue_cv.broadcast end end end end |
#restore_2ndmemory ⇒ Object
1380 1381 1382 1383 1384 1385 1386 1387 1388 |
# File 'lib/fairy/share/port.rb', line 1380 def restore_2ndmemory # Log::debug(self, "start restore") buf = @buffers_queue.shift io = buf.open queue = Marshal.load(io) buf.close! # Log::debug(self, "end restore") queue end |
#store_2ndmemory(ary) ⇒ Object
1372 1373 1374 1375 1376 1377 1378 |
# File 'lib/fairy/share/port.rb', line 1372 def store_2ndmemory(ary) # Log::debug(self, "start store") open_2ndmemory do |io| Marshal.dump(ary, io) end # Log::debug(self, "end store") end |