Module: R10K::ContentSynchronizer
- Defined in:
- lib/r10k/content_synchronizer.rb
Class Method Summary collapse
- .concurrent_accept(modules, visitor, loader, pool_size, logger) ⇒ Object
- .concurrent_sync(modules, pool_size, logger) ⇒ Object
- .enqueue_modules(queue, modules) ⇒ Object
- .modules_sync_queue(modules) ⇒ Object
- .modules_visit_queue(modules, visitor, loader) ⇒ Object
- .serial_accept(modules, visitor, loader) ⇒ Object
- .serial_sync(modules) ⇒ Object
- .sync_queue(mods_queue, pool_size, logger) ⇒ Object
- .sync_thread(mods_queue, logger) ⇒ Object
Class Method Details
.concurrent_accept(modules, visitor, loader, pool_size, logger) ⇒ Object
16 17 18 19 |
# File 'lib/r10k/content_synchronizer.rb', line 16 def self.concurrent_accept(modules, visitor, loader, pool_size, logger) mods_queue = modules_visit_queue(modules, visitor, loader) sync_queue(mods_queue, pool_size, logger) end |
.concurrent_sync(modules, pool_size, logger) ⇒ Object
21 22 23 24 |
# File 'lib/r10k/content_synchronizer.rb', line 21 def self.concurrent_sync(modules, pool_size, logger) mods_queue = modules_sync_queue(modules) sync_queue(mods_queue, pool_size, logger) end |
.enqueue_modules(queue, modules) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/r10k/content_synchronizer.rb', line 60 def self.enqueue_modules(queue, modules) modules_by_cachedir = modules.group_by { |mod| mod.cachedir } modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || [] modules_without_vcs_cachedir.each {|mod| queue << Array(mod) } modules_by_cachedir.values.each {|mods| queue << mods } end |
.modules_sync_queue(modules) ⇒ Object
54 55 56 57 58 |
# File 'lib/r10k/content_synchronizer.rb', line 54 def self.modules_sync_queue(modules) Queue.new.tap do |queue| enqueue_modules(queue, modules) end end |
.modules_visit_queue(modules, visitor, loader) ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/r10k/content_synchronizer.rb', line 46 def self.modules_visit_queue(modules, visitor, loader) Queue.new.tap do |queue| visitor.visit(:puppetfile, loader) do enqueue_modules(queue, modules) end end end |
.serial_accept(modules, visitor, loader) ⇒ Object
4 5 6 7 8 |
# File 'lib/r10k/content_synchronizer.rb', line 4 def self.serial_accept(modules, visitor, loader) visitor.visit(:puppetfile, loader) do serial_sync(modules) end end |
.serial_sync(modules) ⇒ Object
10 11 12 13 14 |
# File 'lib/r10k/content_synchronizer.rb', line 10 def self.serial_sync(modules) modules.each do |mod| mod.sync end end |
.sync_queue(mods_queue, pool_size, logger) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/r10k/content_synchronizer.rb', line 26 def self.sync_queue(mods_queue, pool_size, logger) logger.debug _("Updating modules with %{pool_size} threads") % {pool_size: pool_size} thread_pool = pool_size.times.map { sync_thread(mods_queue, logger) } thread_exception = nil # If any threads raise an exception the deployment is considered a failure. # In that event clear the queue, wait for other threads to finish their # current work, then re-raise the first exception caught. begin thread_pool.each(&:join) rescue => e logger.error _("Error during concurrent deploy of a module: %{message}") % {message: e.} mods_queue.clear thread_exception ||= e retry ensure raise thread_exception unless thread_exception.nil? end end |
.sync_thread(mods_queue, logger) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/r10k/content_synchronizer.rb', line 68 def self.sync_thread(mods_queue, logger) Thread.new do begin while mods = mods_queue.pop(true) do mods.each { |mod| mod.sync } end rescue ThreadError => e logger.debug _("Module thread %{id} exiting: %{message}") % {message: e., id: Thread.current.object_id} Thread.exit rescue => e Thread.main.raise(e) end end end |