Class: Bricolage::StreamingLoad::ChunkBuffer
- Inherits:
-
Object
- Object
- Bricolage::StreamingLoad::ChunkBuffer
- Includes:
- SQLUtils
- Defined in:
- lib/bricolage/streamingload/chunkbuffer.rb
Constant Summary collapse
- TASK_GENERATION_TIME_LIMIT =
sec
30
Instance Method Summary collapse
-
#flush_all ⇒ Object
Flushes all chunks of all stream with no additional conditions, to create “system checkpoint”.
-
#flush_partial ⇒ Object
Flushes chunks of multiple streams, which are met conditions.
-
#flush_stream(stream_name) ⇒ Object
Flushes all chunks of the specified stream with no additional conditions, to create “stream checkpoint”.
-
#initialize(control_data_source:, logger:) ⇒ ChunkBuffer
constructor
A new instance of ChunkBuffer.
- #load_tasks_by_id(ids) ⇒ Object
-
#save(chunk) ⇒ Object
- chunk
-
IncomingChunk.
Constructor Details
#initialize(control_data_source:, logger:) ⇒ ChunkBuffer
Returns a new instance of ChunkBuffer.
15 16 17 18 19 |
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 15 def initialize(control_data_source:, logger:) @ctl_ds = control_data_source @logger = logger @task_generation_time_limit = TASK_GENERATION_TIME_LIMIT end |
Instance Method Details
#flush_all ⇒ Object
Flushes all chunks of all stream with no additional conditions, to create “system checkpoint”.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 58 def flush_all all_task_ids = [] tasks = nil @ctl_ds.open {|conn| conn.transaction {|txn| # update_task_objects may not consume all saved objects # (e.g. there are too many objects for one table), we must create # tasks repeatedly until all objects are flushed. until (task_ids = insert_tasks(conn, force: true)).empty? update_task_objects(conn, task_ids) all_task_ids.concat task_ids end } log_task_ids(all_task_ids) tasks = load_tasks(conn, all_task_ids) } tasks end |
#flush_partial ⇒ Object
Flushes chunks of multiple streams, which are met conditions.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 39 def flush_partial task_ids = nil tasks = nil @ctl_ds.open {|conn| warn_slow_task_generation { conn.transaction {|txn| task_ids = insert_tasks(conn) update_task_objects(conn, task_ids) unless task_ids.empty? } } log_task_ids(task_ids) tasks = load_tasks(conn, task_ids) } tasks end |
#flush_stream(stream_name) ⇒ Object
Flushes all chunks of the specified stream with no additional conditions, to create “stream checkpoint”.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 80 def flush_stream(stream_name) all_task_ids = [] tasks = nil @ctl_ds.open {|conn| conn.transaction {|txn| # update_task_objects may not consume all saved objects # (e.g. there are too many objects for one table), we must create # tasks repeatedly until all objects are flushed. until (task_ids = insert_tasks_for_stream(conn, stream_name)).empty? update_task_objects(conn, task_ids) all_task_ids.concat task_ids end } log_task_ids(all_task_ids) tasks = load_tasks(conn, all_task_ids) } tasks end |
#load_tasks_by_id(ids) ⇒ Object
100 101 102 103 104 |
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 100 def load_tasks_by_id(ids) @ctl_ds.open {|conn| return load_tasks(conn, ids) } end |
#save(chunk) ⇒ Object
- chunk
-
IncomingChunk
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/bricolage/streamingload/chunkbuffer.rb', line 22 def save(chunk) @ctl_ds.open {|conn| suppress_sql_logging { conn.transaction { object_id = insert_object(conn, chunk) if object_id insert_task_objects(conn, object_id) else @logger.info "Duplicated object recieved: url=#{chunk.url}" insert_dup_object(conn, chunk) end } } } end |