Class: Fairy::MarshaledQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port-marshaled-queue.rb

Direct Known Subclasses

SizedMarshaledQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy, queues_mon = XThread::Monitor.new, queues_cv = queues_mon.new_cond) ⇒ MarshaledQueue

Returns a new instance of MarshaledQueue.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/fairy/share/port-marshaled-queue.rb', line 11

def initialize(policy, queues_mon = XThread::Monitor.new, queues_cv = queues_mon.new_cond)
  @policy = policy

  @chunk_size = CONF.MARSHAL_QUEUE_CHUNK_SIZE
  @min_chunk_no = @policy[:min_chunk_no]
  @min_chunk_no ||= CONF.MARSHAL_QUEUE_MIN_CHUNK_NO

  @push_queue = []
  @push_queue_mutex = Mutex.new
  
  @queues = []
  @queues_mon = queues_mon
  @queues_cv = queues_cv

  @pop_queue = nil
end

Instance Attribute Details

#fib_cvObject

Returns the value of attribute fib_cv.



28
29
30
# File 'lib/fairy/share/port-marshaled-queue.rb', line 28

def fib_cv
  @fib_cv
end

Instance Method Details

#popObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fairy/share/port-marshaled-queue.rb', line 78

def pop
  while !@pop_queue || @pop_queue.empty?
	@queues_mon.synchronize do
	  raw = nil
	  @queues_cv.wait_until{raw = @queues.shift}
	  if raw == :END_OF_STREAM
 @pop_queue = [raw]
	  else
 @pop_queue = Marshal.load(raw)
	  end
	end
  end
  e = @pop_queue.shift
  @pop_queue = nil if @pop_queue.empty?
  e
end

#pop_allObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fairy/share/port-marshaled-queue.rb', line 95

def pop_all
  while !@pop_queue
	@queues_mon.synchronize do
	  raw = nil
	  @queues_cv.wait_until{raw = @queues.shift}
	  if raw == :END_OF_STREAM
 @pop_queue = [raw]
	  else
 @pop_queue = Marshal.load(raw)
	  end
	end
  end
  buf, @pop_queue = @pop_queue, nil
  buf
end

#pop_rawObject



111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/fairy/share/port-marshaled-queue.rb', line 111

def pop_raw
  if @pop_queue && !@pop_queue.empty?
	ERR::Raise ERR::INTERNAL::MarshalQueueNotEmpty
  end
  
  pop_raw = nil
  while !pop_raw
	@queues_mon.synchronize do
	  @queues_cv.wait_until{pop_raw = @queues.shift}
	end
  end
  pop_raw
end

#push(e) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fairy/share/port-marshaled-queue.rb', line 30

def push(e)
  @push_queue_mutex.synchronize do
	@push_queue.push e
	if @push_queue.size >= @min_chunk_no || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
	  @queues_mon.synchronize do
 @push_queue.pop if e == :END_OF_STREAM
 @queues.push Marshal.dump(@push_queue)
 @queues.push e if e == :END_OF_STREAM

 @push_queue = []
 @queues_cv.broadcast
	  end
	end
  end
end

#push_all(buf) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fairy/share/port-marshaled-queue.rb', line 48

def push_all(buf)
  @push_queue_mutex.synchronize do
	@push_queue.concat buf
	if @push_queue.size > @min_chunk_no || 
 @push_queue.last == :END_OF_STREAM
	  @queues_mon.synchronize do
 @push_queue.pop if e == :END_OF_STREAM
 @queues.push Marshal.dump(@push_queue)
 @queues.push e if e == :END_OF_STREAM

 @push_queue = []
 @queues_cv.broadcast
	  end
	end
  end
end

#push_raw(raw) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fairy/share/port-marshaled-queue.rb', line 65

def push_raw(raw)
  @push_queue_mutex.synchronize do
	@queues_mon.synchronize do
	  unless @push_queue.empty?
 @queues.push Marshal.dump(@push_queue)
 @push_queue = []
	  end
	  @queues.push raw
	  @queues_cv.broadcast
	end
  end
end