Class: Taps::DataStreamKeyed
  
  
  
  Constant Summary
  
  Constants inherited
     from DataStream
  Taps::DataStream::DEFAULT_CHUNKSIZE
  Instance Attribute Summary collapse
  
  
  
  Attributes inherited from DataStream
  #db, #state
  
    
      Instance Method Summary
      collapse
    
    
  
  
  
  
  
  
  
  
  
  Methods inherited from DataStream
  #complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_chunksize, #fetch_from_resource, #fetch_remote, #fetch_remote_in_server, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats, #verify_remote_stream
  Constructor Details
  
    
  
  
    
Returns a new instance of DataStreamKeyed.
   
 
  
  
    | 
239
240
241
242
243
244 | # File 'lib/taps/data_stream.rb', line 239
def initialize(db, state)
  super(db, state)
  @state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @buffer = []
end | 
 
  
 
  
    Instance Attribute Details
    
      
      
      
  
  
    #buffer  ⇒ Object 
  
  
  
  
    
Returns the value of attribute buffer.
   
 
  
  
    | 
237
238
239 | # File 'lib/taps/data_stream.rb', line 237
def buffer
  @buffer
end | 
 
    
   
  
    Instance Method Details
    
      
  
  
    #buffer_limit  ⇒ Object 
  
  
  
  
    | 
250
251
252
253
254
255
256 | # File 'lib/taps/data_stream.rb', line 250
def buffer_limit
  if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0
    state[:last_fetched]
  else
    state[:filter]
  end
end | 
 
    
      
  
  
    #calc_limit(chunksize)  ⇒ Object 
  
  
  
  
    | 
258
259
260
261
262
263
264
265
266
267 | # File 'lib/taps/data_stream.rb', line 258
def calc_limit(chunksize)
        if defined?(Sinatra)
    (chunksize * 1.1).ceil
  else
    (chunksize * 3).ceil
  end
end | 
 
    
      
  
  
    #fetch_buffered(chunksize)  ⇒ Object 
  
  
  
  
    | 
293
294
295
296
297
298
299
300
301
302 | # File 'lib/taps/data_stream.rb', line 293
def fetch_buffered(chunksize)
  load_buffer(chunksize) if self.buffer.size < chunksize
  rows = buffer.slice(0, chunksize)
  state[:last_fetched] = if rows.size > 0
    rows.last[ primary_key ]
  else
    nil
  end
  rows
end | 
 
    
      
  
  
    #fetch_rows  ⇒ Object 
  
  
  
  
    | 
308
309
310
311
312 | # File 'lib/taps/data_stream.rb', line 308
def fetch_rows
  chunksize = state[:chunksize]
  Taps::Utils.format_data(fetch_buffered(chunksize) || [],
    :string_columns => string_columns)
end | 
 
    
      
  
  
    #import_rows(rows)  ⇒ Object 
  
  
  
  
    | 
304
305
306 | # File 'lib/taps/data_stream.rb', line 304
def import_rows(rows)
  table.import(rows[:header], rows[:data])
end | 
 
    
      
  
  
    #increment(row_count)  ⇒ Object 
  
  
  
  
    | 
314
315
316
317 | # File 'lib/taps/data_stream.rb', line 314
def increment(row_count)
    @buffer.slice!(0, row_count)
end | 
 
    
      
  
  
    #load_buffer(chunksize)  ⇒ Object 
  
  
  
  
    | 
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291 | # File 'lib/taps/data_stream.rb', line 269
def load_buffer(chunksize)
    Sequel::BasicObject.remove_methods!
  num = 0
  loop do
    limit = calc_limit(chunksize)
        key = primary_key
    buf_limit = buffer_limit
    ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
    log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}"
    data = ds.all
    self.buffer += data
    num += data.size
    if data.size > 0
            state[:filter] = self.buffer.last[ primary_key ]
    end
    break if num >= chunksize or data.size == 0
  end
end | 
 
    
      
  
  
    #primary_key  ⇒ Object 
  
  
  
  
    | 
246
247
248 | # File 'lib/taps/data_stream.rb', line 246
def primary_key
  state[:primary_key].to_sym
end | 
 
    
      
  
  
    #verify_stream  ⇒ Object 
  
  
  
  
    | 
319
320
321
322
323
324
325
326
327
328
329
330 | # File 'lib/taps/data_stream.rb', line 319
def verify_stream
  key = primary_key
  ds = table.order(*order_by)
  current_filter = ds.max(key.sql_number)
    state[:filter] = current_filter
    state[:last_fetched] = nil
  log.debug "DataStreamKeyed#verify_stream -> state: #{state.inspect}"
end |