Class: Fairy::FileBufferdQueue
- Inherits:
-
Object
- Object
- Fairy::FileBufferdQueue
- Defined in:
- lib/fairy/share/port.rb
Instance Method Summary collapse
- #init_2ndmemory ⇒ Object
-
#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ FileBufferdQueue
constructor
A new instance of FileBufferdQueue.
- #open_2ndmemory(&block) ⇒ Object
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #restore_2ndmemory ⇒ Object
- #store_2ndmemory(ary) ⇒ Object
Constructor Details
#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ FileBufferdQueue
Returns a new instance of FileBufferdQueue.
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 |
# File 'lib/fairy/share/port.rb', line 1131 def initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) @policy = policy @threshold = policy[:threshold] @threshold ||= CONF.FILEBUFFEREDQUEUE_THRESHOLD @push_queue = [] @pop_queue = @push_queue @buffers_queue = nil @queue_mon = queue_mon @queue_cv = queue_cv end |
Instance Method Details
#init_2ndmemory ⇒ Object
1201 1202 1203 1204 1205 1206 |
# File 'lib/fairy/share/port.rb', line 1201 def init_2ndmemory @buffer_dir = @policy[:buffer_dir] @buffer_dir ||= CONF.TMP_DIR @buffers_queue = [] end |
#open_2ndmemory(&block) ⇒ Object
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 |
# File 'lib/fairy/share/port.rb', line 1208 def open_2ndmemory(&block) unless @buffers_queue init_2ndmemory end buffer = FastTempfile.open("port-buffer-", @buffer_dir) begin yield buffer.io ensure buffer.close end @buffers_queue.push buffer buffer end |
#pop ⇒ Object
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 |
# File 'lib/fairy/share/port.rb', line 1161 def pop @queue_mon.synchronize do while @pop_queue.empty? if @pop_queue.equal?(@push_queue) @queue_cv.wait elsif @buffers_queue.nil? @pop_queue = @push_queue elsif @buffers_queue.empty? @pop_queue = @push_queue @push_queue = [] @buffers_queue = nil else @pop_queue = restore_2ndmemory end end @pop_queue.shift end end |
#pop_all ⇒ Object
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 |
# File 'lib/fairy/share/port.rb', line 1180 def pop_all @queue_mon.synchronize do while @pop_queue.empty? if @pop_queue.equal?(@push_queue) @queue_cv.wait elsif @buffers_queue.nil? @pop_queue = @push_queue elsif @buffers_queue.empty? @pop_queue = @push_queue @push_queue = [] @buffers_queue = nil else @pop_queue = restore_2ndmemory end end pops = @pop_queue.dup @pop_queue.clear pops end end |
#push(e) ⇒ Object
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 |
# File 'lib/fairy/share/port.rb', line 1145 def push(e) @queue_mon.synchronize do @push_queue.push e @queue_cv.broadcast if @push_queue.size >= @threshold if @push_queue.equal?(@pop_queue) @push_queue = [] else store_2ndmemory(@push_queue) @push_queue = [] end end end end |
#restore_2ndmemory ⇒ Object
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 |
# File 'lib/fairy/share/port.rb', line 1234 def restore_2ndmemory buf = @buffers_queue.shift io = buf.open # queue = [] # begin # loop do # queue.push Marshal.load(io) # end # rescue # end queue = Marshal.load(io) buf.close! # Log::info(self, "end restore") queue end |
#store_2ndmemory(ary) ⇒ Object
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 |
# File 'lib/fairy/share/port.rb', line 1222 def store_2ndmemory(ary) # Log::info(self, "start store") open_2ndmemory do |io| # while !ary.empty? # e = ary.shift # Marshal.dump(e, io) # end Marshal.dump(ary, io) end # Log::info(self, "end store") end |