Class: Fairy::FileBufferdQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ FileBufferdQueue

Returns a new instance of FileBufferdQueue.



1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
# File 'lib/fairy/share/port.rb', line 1131

def initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond)

  @policy = policy
  @threshold = policy[:threshold]
  @threshold ||= CONF.FILEBUFFEREDQUEUE_THRESHOLD

  @push_queue = []
  @pop_queue = @push_queue
  @buffers_queue = nil

  @queue_mon = queue_mon
  @queue_cv = queue_cv
end

Instance Method Details

#init_2ndmemoryObject



1201
1202
1203
1204
1205
1206
# File 'lib/fairy/share/port.rb', line 1201

def init_2ndmemory
  @buffer_dir = @policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR

  @buffers_queue = []
end

#open_2ndmemory(&block) ⇒ Object



1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
# File 'lib/fairy/share/port.rb', line 1208

def open_2ndmemory(&block)
  unless @buffers_queue
	init_2ndmemory
  end
  buffer = FastTempfile.open("port-buffer-", @buffer_dir)
  begin
	yield buffer.io
  ensure
	buffer.close
  end
  @buffers_queue.push buffer
  buffer
end

#popObject



1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
# File 'lib/fairy/share/port.rb', line 1161

def pop
  @queue_mon.synchronize do
	while @pop_queue.empty?
	  if @pop_queue.equal?(@push_queue)
 @queue_cv.wait
	  elsif @buffers_queue.nil?
 @pop_queue = @push_queue
	  elsif @buffers_queue.empty?
 @pop_queue = @push_queue
 @push_queue = []
 @buffers_queue = nil
	  else
 @pop_queue = restore_2ndmemory
	  end
	end
	@pop_queue.shift
  end
end

#pop_allObject



1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
# File 'lib/fairy/share/port.rb', line 1180

def pop_all
  @queue_mon.synchronize do
	while @pop_queue.empty?
	  if @pop_queue.equal?(@push_queue)
 @queue_cv.wait
	  elsif @buffers_queue.nil?
 @pop_queue = @push_queue
	  elsif @buffers_queue.empty?
 @pop_queue = @push_queue
 @push_queue = []
 @buffers_queue = nil
	  else
 @pop_queue = restore_2ndmemory
	  end
	end
	pops = @pop_queue.dup
	@pop_queue.clear
	pops
  end
end

#push(e) ⇒ Object



1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
# File 'lib/fairy/share/port.rb', line 1145

def push(e)
  @queue_mon.synchronize do
	@push_queue.push e
	@queue_cv.broadcast

	if @push_queue.size >= @threshold
	  if @push_queue.equal?(@pop_queue)
 @push_queue = []
	  else
 store_2ndmemory(@push_queue)
 @push_queue = []
	  end
	end
  end
end

#restore_2ndmemoryObject



1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
# File 'lib/fairy/share/port.rb', line 1234

def restore_2ndmemory
  buf = @buffers_queue.shift
  io = buf.open
#       queue = []
#       begin
# 	loop do
# 	  queue.push Marshal.load(io)
# 	end
#       rescue
#       end
  queue = Marshal.load(io)
  buf.close!
#      Log::info(self, "end restore")
  queue
end

#store_2ndmemory(ary) ⇒ Object



1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
# File 'lib/fairy/share/port.rb', line 1222

def store_2ndmemory(ary)
#      Log::info(self, "start store")
  open_2ndmemory do |io|
# 	while !ary.empty?
# 	  e = ary.shift
# 	  Marshal.dump(e, io)
# 	end
	Marshal.dump(ary, io)
  end
#      Log::info(self, "end store")
end