Class: Fairy::FileMarshaledQueue
- Inherits:
-
Object
- Object
- Fairy::FileMarshaledQueue
- Defined in:
- lib/fairy/share/port-marshaled-queue.rb
Instance Method Summary collapse
-
#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ FileMarshaledQueue
constructor
A new instance of FileMarshaledQueue.
- #open_2ndmemory(&block) ⇒ Object
- #pop ⇒ Object
- #pop_all ⇒ Object
- #pop_raw ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
- #push_raw(raw) ⇒ Object
- #restore_2ndmemory(buf) ⇒ Object
- #restore_raw_2ndmemory(buf) ⇒ Object
- #store_2ndmemory(ary) ⇒ Object
- #store_raw_2ndmemory(raw) ⇒ Object
Constructor Details
#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ FileMarshaledQueue
Returns a new instance of FileMarshaledQueue.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 183 def initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) @policy = policy @chunk_size = CONF.MARSHAL_QUEUE_CHUNK_SIZE @min_chunk_no = @policy[:min_chunk_no] @min_chunk_no ||= CONF.MARSHAL_QUEUE_MIN_CHUNK_NO @push_queue = [] @push_queue_mutex = Mutex.new @buffers_queue = [] @buffers_queue_mon = queues_mon @buffers_queue_cv = queues_cv @pop_queue = nil @buffer_dir = @policy[:buffer_dir] @buffer_dir ||= CONF.TMP_DIR end |
Instance Method Details
#open_2ndmemory(&block) ⇒ Object
309 310 311 312 313 314 315 316 317 318 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 309 def open_2ndmemory(&block) buffer = FastTempfile.open("port-buffer-", @buffer_dir) begin yield buffer.io ensure buffer.close end @buffers_queue.push buffer buffer end |
#pop ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 255 def pop while !@pop_queue || @pop_queue.empty? @buffers_queue_mon.synchronize do buf = nil @buffers_queue_cv.wait_until{buf = @buffers_queue.shift} if buf == :END_OF_STREAM @pop_queue = [buf] else @pop_queue = restore_2ndmemory(buf) end end end e = @pop_queue.shift @pop_queue = nil if @pop_queue.empty? e end |
#pop_all ⇒ Object
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 273 def pop_all while !@pop_queue @buffers_queue_mon.synchronize do buf = nil @buffers_queue_cv.wait_until{buf = @buffers_queue.shift} if buf == :END_OF_STREAM @pop_queue = [buf] else @pop_queue = restore_2ndmemory(buf) end end end buf, @pop_queue = @pop_queue, nil buf end |
#pop_raw ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 289 def pop_raw if @pop_queue && !@pop_queue.empty? ERR::Raise ERR::INTERNAL::MarshalQueueNotEmpty end pop_raw = nil while !pop_raw @buffers_queue_mon.synchronize do buf = nil @buffers_queue_cv.wait_until{buf = @buffers_queue.shift} if buf == :END_OF_STREAM pop_raw = buf else pop_raw = restore_raw_2ndmemory(buf) end end end pop_raw end |
#push(e) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 203 def push(e) @push_queue_mutex.synchronize do @push_queue.push e if @push_queue.size >= @min_chunk_no || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @buffers_queue_mon.synchronize do @push_queue.pop if e == :END_OF_STREAM store_2ndmemory(@push_queue) @buffers_queue.push e if e == :END_OF_STREAM @push_queue = [] @buffers_queue_cv.broadcast end end end end |
#push_all(buf) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 221 def push_all(buf) @push_queue_mutex.synchronize do @push_queue.concat buf if @push_queue.size > @min_chunk_no || @push_queue.last == :END_OF_STREAM @buffers_queue_mon.synchronize do @push_queue.pop if e == :END_OF_STREAM store_2ndmemory(@push_queue) @buffers_queue.push e if e == :END_OF_STREAM @push_queue = [] @buffers_queue_cv.broadcast end end end end |
#push_raw(raw) ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 238 def push_raw(raw) @push_queue_mutex.synchronize do @buffers_queue_mon.synchronize do unless @push_queue.empty? store_2ndmemory(@push_queue) @push_queue = [] end if raw == :END_OF_STREAM @buffers_queue.push raw else store_raw_2ndmemory(raw) end @buffers_queue_cv.broadcast end end end |
#restore_2ndmemory(buf) ⇒ Object
332 333 334 335 336 337 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 332 def restore_2ndmemory(buf) io = buf.open queue = Marshal.load(io) buf.close! queue end |
#restore_raw_2ndmemory(buf) ⇒ Object
339 340 341 342 343 344 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 339 def restore_raw_2ndmemory(buf) io = buf.open raw = io.read buf.close! raw end |
#store_2ndmemory(ary) ⇒ Object
320 321 322 323 324 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 320 def store_2ndmemory(ary) open_2ndmemory do |io| Marshal.dump(ary, io) end end |
#store_raw_2ndmemory(raw) ⇒ Object
326 327 328 329 330 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 326 def store_raw_2ndmemory(raw) open_2ndmemory do |io| io.write raw end end |