Class: Bricolage::StreamingLoad::ChunkBuffer

Inherits:
Object
  • Object
show all
Includes:
SQLUtils
Defined in:
lib/bricolage/streamingload/chunkbuffer.rb

Constant Summary collapse

TASK_GENERATION_TIME_LIMIT =

sec

30

Instance Method Summary collapse

Constructor Details

#initialize(control_data_source:, logger:) ⇒ ChunkBuffer

Returns a new instance of ChunkBuffer.



15
16
17
18
19
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 15

def initialize(control_data_source:, logger:)
  @ctl_ds = control_data_source
  @logger = logger
  @task_generation_time_limit = TASK_GENERATION_TIME_LIMIT
end

Instance Method Details

#flush_allObject

Flushes all chunks of all stream with no additional conditions, to create “system checkpoint”.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 58

def flush_all
  all_task_ids = []
  tasks = nil

  @ctl_ds.open {|conn|
    conn.transaction {|txn|
      # update_task_objects may not consume all saved objects
      # (e.g. there are too many objects for one table), we must create
      # tasks repeatedly until all objects are flushed.
      until (task_ids = insert_tasks(conn, force: true)).empty?
        update_task_objects(conn, task_ids)
        all_task_ids.concat task_ids
      end
    }
    log_task_ids(all_task_ids)
    tasks = load_tasks(conn, all_task_ids)
  }
  tasks
end

#flush_partialObject

Flushes chunks of multiple streams, which are met conditions.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 39

def flush_partial
  task_ids = nil
  tasks = nil

  @ctl_ds.open {|conn|
    warn_slow_task_generation {
      conn.transaction {|txn|
        task_ids = insert_tasks(conn)
        update_task_objects(conn, task_ids) unless task_ids.empty?
      }
    }
    log_task_ids(task_ids)
    tasks = load_tasks(conn, task_ids)
  }
  tasks
end

#flush_stream(stream_name) ⇒ Object

Flushes all chunks of the specified stream with no additional conditions, to create “stream checkpoint”.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 80

def flush_stream(stream_name)
  all_task_ids = []
  tasks = nil

  @ctl_ds.open {|conn|
    conn.transaction {|txn|
      # update_task_objects may not consume all saved objects
      # (e.g. there are too many objects for one table), we must create
      # tasks repeatedly until all objects are flushed.
      until (task_ids = insert_tasks_for_stream(conn, stream_name)).empty?
        update_task_objects(conn, task_ids)
        all_task_ids.concat task_ids
      end
    }
    log_task_ids(all_task_ids)
    tasks = load_tasks(conn, all_task_ids)
  }
  tasks
end

#load_tasks_by_id(ids) ⇒ Object



100
101
102
103
104
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 100

def load_tasks_by_id(ids)
  @ctl_ds.open {|conn|
    return load_tasks(conn, ids)
  }
end

#save(chunk) ⇒ Object

chunk

IncomingChunk



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 22

def save(chunk)
  @ctl_ds.open {|conn|
    suppress_sql_logging {
      conn.transaction {
        object_id = insert_object(conn, chunk)
        if object_id
          insert_task_objects(conn, object_id)
        else
          @logger.info "Duplicated object recieved: url=#{chunk.url}"
          insert_dup_object(conn, chunk)
        end
      }
    }
  }
end