Class: Dor::WorkflowArchiver
- Inherits:
-
Object
- Object
- Dor::WorkflowArchiver
- Defined in:
- lib/dor/workflow_archiver.rb
Constant Summary collapse
- WF_COLUMNS =
%w(id druid datastream process status error_msg error_txt datetime attempts lifecycle elapsed repository note priority lane_id)
Instance Attribute Summary collapse
-
#errors ⇒ Object
readonly
These attributes mostly used for testing.
Class Method Summary collapse
Instance Method Summary collapse
-
#archive ⇒ Object
Does the work of finding completed objects and archiving the rows.
-
#archive_one_datastream(repository, druid, datastream, version) ⇒ Object
Use this as a one-shot method to archive all the steps of an object’s particular datastream It will connect to the database, archive the rows, then logoff.
-
#archive_rows(objs) ⇒ Object
Copies rows from the workflow table to the workflow_archive table, then deletes the rows from workflow Both operations must complete, or they get rolled back.
- #conn ⇒ Object
- #do_one_archive(workflow_info) ⇒ Object
- #dor_conn ⇒ Object
-
#find_completed_objects ⇒ Array<Hash{String=>String}>
Finds objects where all workflow steps are complete.
-
#initialize(opts = {}) ⇒ WorkflowArchiver
constructor
Sets up logging and connects to the database.
-
#map_result_to_criteria(rows) ⇒ Array<ArchiveCriteria>
Each result mapped to an ArchiveCriteria object.
-
#wf_archive_column_string ⇒ String
The columns prepended with ‘w.’ and appended with comma and newline.
-
#wf_column_string ⇒ String
The columns appended with comma and newline.
Constructor Details
#initialize(opts = {}) ⇒ WorkflowArchiver
Sets up logging and connects to the database. By default it reads values from constants:
WORKFLOW_DB_LOGIN, WORKFLOW_DB_PASSWORD, WORKFLOW_DB_URI, DOR_SERVICE_URI but can be overriden with the opts Hash
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/dor/workflow_archiver.rb', line 56 def initialize(opts = {}) @conn = opts[:db_connection] @db_uri = opts.fetch(:db_uri, WorkflowArchiver.config.db_uri).freeze @workflow_table = opts.include?(:wf_table) ? opts[:wf_table] : 'workflow' @workflow_archive_table = opts.include?(:wfa_table) ? opts[:wfa_table] : 'workflow_archive' @retry_delay = opts.include?(:retry_delay) ? opts[:retry_delay] : 5 # initialize some counters @errors = 0 @archived = 0 end |
Instance Attribute Details
#errors ⇒ Object (readonly)
These attributes mostly used for testing
43 44 45 |
# File 'lib/dor/workflow_archiver.rb', line 43 def errors @errors end |
Class Method Details
.config ⇒ Object
45 46 47 |
# File 'lib/dor/workflow_archiver.rb', line 45 def self.config @@conf ||= Confstruct::Configuration.new end |
Instance Method Details
#archive ⇒ Object
Does the work of finding completed objects and archiving the rows
204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/dor/workflow_archiver.rb', line 204 def archive objs = find_completed_objects if objs.none? LyberCore::Log.info 'Nothing to archive' else LyberCore::Log.info "Found #{objs.count} completed workflows" archiving_criteria = map_result_to_criteria(objs) archive_rows(archiving_criteria) LyberCore::Log.info "DONE! Processed #{@archived.to_s} objects with #{@errors.to_s} errors" if @errors < 3 end end |
#archive_one_datastream(repository, druid, datastream, version) ⇒ Object
Caller of this method must handle destroying of the connection pool
Use this as a one-shot method to archive all the steps of an object’s particular datastream
It will connect to the database, archive the rows, then logoff. Assumes caller will set version (like the Dor REST service)
92 93 94 95 |
# File 'lib/dor/workflow_archiver.rb', line 92 def archive_one_datastream(repository, druid, datastream, version) criteria = [ArchiveCriteria.new(repository, druid, datastream, version)] archive_rows criteria end |
#archive_rows(objs) ⇒ Object
Copies rows from the workflow table to the workflow_archive table, then deletes the rows from workflow Both operations must complete, or they get rolled back
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/dor/workflow_archiver.rb', line 100 def archive_rows(objs) objs.each do |obj| tries = 0 begin tries += 1 do_one_archive(obj) @archived += 1 rescue => e LyberCore::Log.error "Rolling back transaction due to: #{e.inspect}\n" << e.backtrace.join("\n") << "\n!!!!!!!!!!!!!!!!!!" if tries < 3 # Retry this druid up to 3 times LyberCore::Log.error " Retrying archive operation in #{@retry_delay} seconds..." sleep @retry_delay retry end LyberCore::Log.error " Too many retries. Giving up on #{obj.inspect}" @errors += 1 if @errors >= 3 LyberCore::Log.fatal('Too many errors. Archiving halted') break end end end # druids.each end |
#conn ⇒ Object
67 68 69 |
# File 'lib/dor/workflow_archiver.rb', line 67 def conn @conn ||= Sequel.connect(@db_uri) end |
#do_one_archive(workflow_info) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/dor/workflow_archiver.rb', line 126 def do_one_archive(workflow_info) LyberCore::Log.info "Archiving #{workflow_info.inspect}" copy_sql = <<-EOSQL insert into #{@workflow_archive_table} ( #{wf_column_string}, version ) select #{wf_archive_column_string}, #{workflow_info.version} as version from #{@workflow_table} where #{@workflow_table}.druid = :druid and #{@workflow_table}.datastream = :datastream EOSQL delete_sql = "delete from #{@workflow_table} where druid = :druid and datastream = :datastream " LyberCore::Log.debug "copy_sql is #{copy_sql}" LyberCore::Log.debug "delete_sql is #{delete_sql}" if(workflow_info.repository) copy_sql += "and #{@workflow_table}.repository = :repository" delete_sql += 'and repository = :repository' else copy_sql += "and #{@workflow_table}.repository IS NULL" delete_sql += 'and repository IS NULL' end conn.transaction do conn.run Sequel::SQL::PlaceholderLiteralString.new(copy_sql, workflow_info.to_bind_hash) LyberCore::Log.debug ' Removing old workflow rows' conn.run Sequel::SQL::PlaceholderLiteralString.new(delete_sql, workflow_info.to_bind_hash) end end |
#dor_conn ⇒ Object
71 72 73 |
# File 'lib/dor/workflow_archiver.rb', line 71 def dor_conn @dor_conn ||= Faraday.new(url: WorkflowArchiver.config.dor_service_uri) end |
#find_completed_objects ⇒ Array<Hash{String=>String}>
Finds objects where all workflow steps are complete
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/dor/workflow_archiver.rb', line 166 def find_completed_objects return to_enum(:find_completed_objects) unless block_given? completed_query = <<-EOSQL select distinct repository, datastream, druid from workflow w1 where w1.status in ('completed', 'skipped') and not exists ( select * from workflow w2 where w1.repository = w2.repository and w1.datastream = w2.datastream and w1.druid = w2.druid and w2.status not in ('completed', 'skipped') ) EOSQL conn.fetch(completed_query) do |row| yield row end end |
#map_result_to_criteria(rows) ⇒ Array<ArchiveCriteria>
Returns each result mapped to an ArchiveCriteria object.
191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/dor/workflow_archiver.rb', line 191 def map_result_to_criteria(rows) rows.lazy.map do |r| begin ArchiveCriteria.new.setup_from_query(r, dor_conn) rescue => e LyberCore::Log.error("Skipping archiving of #{r['DRUID']}") LyberCore::Log.error("#{e.inspect}\n" + e.backtrace.join("\n")) nil end end.reject { |r| r.nil? } end |
#wf_archive_column_string ⇒ String
Returns The columns prepended with ‘w.’ and appended with comma and newline.
81 82 83 |
# File 'lib/dor/workflow_archiver.rb', line 81 def wf_archive_column_string WF_COLUMNS.map { |col| "#{@workflow_table}.#{col}" }.join(",\n") end |
#wf_column_string ⇒ String
Returns The columns appended with comma and newline.
76 77 78 |
# File 'lib/dor/workflow_archiver.rb', line 76 def wf_column_string WF_COLUMNS.join(",\n") end |