Module: Gitlab::GithubImport::ParallelScheduling
- Included in:
- Importer::DiffNotesImporter, Importer::IssuesImporter, Importer::LfsObjectsImporter, Importer::NotesImporter, Importer::PullRequestsImporter, Importer::PullRequestsMergedByImporter, Importer::PullRequestsReviewsImporter, Importer::SingleEndpointDiffNotesImporter, Importer::SingleEndpointIssueNotesImporter, Importer::SingleEndpointMergeRequestNotesImporter
- Defined in:
- lib/gitlab/github_import/parallel_scheduling.rb
Constant Summary collapse
- ALREADY_IMPORTED_CACHE_KEY =
The base cache key to use for tracking already imported objects.
'github-importer/already-imported/%{project}/%{collection}'
Instance Attribute Summary collapse
-
#already_imported_cache_key ⇒ Object
readonly
Returns the value of attribute already_imported_cache_key.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#page_counter ⇒ Object
readonly
Returns the value of attribute page_counter.
-
#project ⇒ Object
readonly
Returns the value of attribute project.
Instance Method Summary collapse
- #abort_on_failure ⇒ Object
-
#already_imported?(object) ⇒ Boolean
Returns true if the given object has already been imported, false otherwise.
-
#collection_method ⇒ Object
The name of the method to call to retrieve the data to import.
-
#collection_options ⇒ Object
Any options to be passed to the method used for retrieving the data to import.
-
#each_object_to_import ⇒ Object
The method that will be called for traversing through all the objects to import, yielding them to the supplied block.
- #execute ⇒ Object
-
#id_for_already_imported_cache(object) ⇒ Object
Returns the ID to use for the cache used for checking if an object has already been imported or not.
-
#importer_class ⇒ Object
The class to use for importing objects when importing them sequentially.
-
#initialize(project, client, parallel: true) ⇒ Object
project - An instance of `Project`.
-
#mark_as_imported(object) ⇒ Object
Marks the given object as “already imported”.
- #object_type ⇒ Object
- #parallel? ⇒ Boolean
-
#parallel_import ⇒ Object
Imports all objects in parallel by scheduling a Sidekiq job for every individual object.
-
#parallel_import_batch ⇒ Object
Default batch settings for parallel import (can be redefined in Importer classes).
- #parallel_import_deprecated ⇒ Object
-
#representation_class ⇒ Object
The class used for converting API responses to Hashes when performing the import.
-
#sequential_import ⇒ Object
Imports all the objects in sequence in the current thread.
-
#sidekiq_worker_class ⇒ Object
The Sidekiq worker class used for scheduling the importing of objects in parallel.
- #spread_parallel_import ⇒ Object
Instance Attribute Details
#already_imported_cache_key ⇒ Object (readonly)
Returns the value of attribute already_imported_cache_key.
6 7 8 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 6 def already_imported_cache_key @already_imported_cache_key end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
6 7 8 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 6 def client @client end |
#page_counter ⇒ Object (readonly)
Returns the value of attribute page_counter.
6 7 8 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 6 def page_counter @page_counter end |
#project ⇒ Object (readonly)
Returns the value of attribute project.
6 7 8 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 6 def project @project end |
Instance Method Details
#abort_on_failure ⇒ Object
214 215 216 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 214 def abort_on_failure false end |
#already_imported?(object) ⇒ Boolean
Returns true if the given object has already been imported, false otherwise.
object - The object to check.
162 163 164 165 166 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 162 def already_imported?(object) id = id_for_already_imported_cache(object) Gitlab::Cache::Import::Caching.set_includes?(already_imported_cache_key, id) end |
#collection_method ⇒ Object
The name of the method to call to retrieve the data to import.
205 206 207 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 205 def collection_method raise NotImplementedError end |
#collection_options ⇒ Object
Any options to be passed to the method used for retrieving the data to import.
220 221 222 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 220 def {} end |
#each_object_to_import ⇒ Object
The method that will be called for traversing through all the objects to import, yielding them to the supplied block.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 126 def each_object_to_import repo = project.import_source # We inject the page number here to make sure that all importers always # start where they left off. Simply starting over wouldn't work for # repositories with a lot of data (e.g. tens of thousands of comments). = .merge(page: page_counter.current) client.each_page(collection_method, repo, ) do |page| # Technically it's possible that the same work is performed multiple # times, as Sidekiq doesn't guarantee there will ever only be one # instance of a job. In such a scenario it's possible for one job to # have a lower page number (e.g. 5) compared to another (e.g. 10). In # this case we skip over all the objects until we have caught up, # reducing the number of duplicate jobs scheduled by the provided # block. next unless page_counter.set(page.number) page.objects.each do |object| next if already_imported?(object) Gitlab::GithubImport::ObjectCounter.increment(project, object_type, :fetched) yield object # We mark the object as imported immediately so we don't end up # scheduling it multiple times. mark_as_imported(object) end end end |
#execute ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 28 def execute info(project.id, message: "starting importer") retval = if parallel? parallel_import else sequential_import end # Once we have completed all work we can remove our "already exists" # cache so we don't put too much pressure on Redis. # # We don't immediately remove it since it's technically possible for # other instances of this job to still run, instead we set the # expiration time to a lower value. This prevents the other jobs from # still scheduling duplicates while. Since all work has already been # completed those jobs will just cycle through any remaining pages while # not scheduling anything. Gitlab::Cache::Import::Caching.expire(already_imported_cache_key, Gitlab::Cache::Import::Caching::SHORTER_TIMEOUT) info(project.id, message: "importer finished") retval rescue StandardError => e Gitlab::Import::ImportFailureService.track( project_id: project.id, error_source: self.class.name, exception: e, fail_import: abort_on_failure, metrics: true ) raise(e) end |
#id_for_already_imported_cache(object) ⇒ Object
Returns the ID to use for the cache used for checking if an object has already been imported or not.
object - The object we may want to import.
183 184 185 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 183 def id_for_already_imported_cache(object) raise NotImplementedError end |
#importer_class ⇒ Object
The class to use for importing objects when importing them sequentially.
194 195 196 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 194 def importer_class raise NotImplementedError end |
#initialize(project, client, parallel: true) ⇒ Object
project - An instance of `Project`. client - An instance of `Gitlab::GithubImport::Client`. parallel - When set to true the objects will be imported in parallel.
15 16 17 18 19 20 21 22 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 15 def initialize(project, client, parallel: true) @project = project @client = client @parallel = parallel @page_counter = PageCounter.new(project, collection_method) @already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY % { project: project.id, collection: collection_method } end |
#mark_as_imported(object) ⇒ Object
Marks the given object as “already imported”.
169 170 171 172 173 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 169 def mark_as_imported(object) id = id_for_already_imported_cache(object) Gitlab::Cache::Import::Caching.set_add(already_imported_cache_key, id) end |
#object_type ⇒ Object
175 176 177 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 175 def object_type raise NotImplementedError end |
#parallel? ⇒ Boolean
24 25 26 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 24 def parallel? @parallel end |
#parallel_import ⇒ Object
Imports all objects in parallel by scheduling a Sidekiq job for every individual object.
74 75 76 77 78 79 80 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 74 def parallel_import if parallel_import_batch.present? spread_parallel_import else parallel_import_deprecated end end |
#parallel_import_batch ⇒ Object
Default batch settings for parallel import (can be redefined in Importer classes)
210 211 212 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 210 def parallel_import_batch { size: 1000, delay: 1.minute } end |
#parallel_import_deprecated ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 82 def parallel_import_deprecated waiter = JobWaiter.new each_object_to_import do |object| repr = representation_class.from_api_response(object) sidekiq_worker_class .perform_async(project.id, repr.to_hash, waiter.key) waiter.jobs_remaining += 1 end waiter end |
#representation_class ⇒ Object
The class used for converting API responses to Hashes when performing the import.
189 190 191 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 189 def representation_class raise NotImplementedError end |
#sequential_import ⇒ Object
Imports all the objects in sequence in the current thread.
64 65 66 67 68 69 70 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 64 def sequential_import each_object_to_import do |object| repr = representation_class.from_api_response(object) importer_class.new(repr, project, client).execute end end |
#sidekiq_worker_class ⇒ Object
The Sidekiq worker class used for scheduling the importing of objects in parallel.
200 201 202 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 200 def sidekiq_worker_class raise NotImplementedError end |
#spread_parallel_import ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/gitlab/github_import/parallel_scheduling.rb', line 97 def spread_parallel_import waiter = JobWaiter.new import_arguments = [] each_object_to_import do |object| repr = representation_class.from_api_response(object) import_arguments << [project.id, repr.to_hash, waiter.key] waiter.jobs_remaining += 1 end # rubocop:disable Scalability/BulkPerformWithContext Gitlab::ApplicationContext.with_context(project: project) do sidekiq_worker_class.bulk_perform_in( 1.second, import_arguments, batch_size: parallel_import_batch[:size], batch_delay: parallel_import_batch[:delay] ) end # rubocop:enable Scalability/BulkPerformWithContext waiter end |