Module: R10K::ContentSynchronizer

Defined in:
lib/r10k/content_synchronizer.rb

Class Method Summary collapse

Class Method Details

.concurrent_accept(modules, visitor, loader, pool_size, logger) ⇒ Object



16
17
18
19
# File 'lib/r10k/content_synchronizer.rb', line 16

def self.concurrent_accept(modules, visitor, loader, pool_size, logger)
  mods_queue = modules_visit_queue(modules, visitor, loader)
  sync_queue(mods_queue, pool_size, logger)
end

.concurrent_sync(modules, pool_size, logger) ⇒ Object



21
22
23
24
# File 'lib/r10k/content_synchronizer.rb', line 21

def self.concurrent_sync(modules, pool_size, logger)
  mods_queue = modules_sync_queue(modules)
  sync_queue(mods_queue, pool_size, logger)
end

.enqueue_modules(queue, modules) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/r10k/content_synchronizer.rb', line 60

def self.enqueue_modules(queue, modules)
  modules_by_cachedir = modules.group_by { |mod| mod.cachedir }
  modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || []

  modules_without_vcs_cachedir.each {|mod| queue << Array(mod) }
  modules_by_cachedir.values.each {|mods| queue << mods }
end

.modules_sync_queue(modules) ⇒ Object



54
55
56
57
58
# File 'lib/r10k/content_synchronizer.rb', line 54

def self.modules_sync_queue(modules)
  Queue.new.tap do |queue|
    enqueue_modules(queue, modules)
  end
end

.modules_visit_queue(modules, visitor, loader) ⇒ Object



46
47
48
49
50
51
52
# File 'lib/r10k/content_synchronizer.rb', line 46

def self.modules_visit_queue(modules, visitor, loader)
  Queue.new.tap do |queue|
    visitor.visit(:puppetfile, loader) do
      enqueue_modules(queue, modules)
    end
  end
end

.serial_accept(modules, visitor, loader) ⇒ Object



4
5
6
7
8
# File 'lib/r10k/content_synchronizer.rb', line 4

def self.serial_accept(modules, visitor, loader)
  visitor.visit(:puppetfile, loader) do
    serial_sync(modules)
  end
end

.serial_sync(modules) ⇒ Object



10
11
12
13
14
# File 'lib/r10k/content_synchronizer.rb', line 10

def self.serial_sync(modules)
  modules.each do |mod|
    mod.sync
  end
end

.sync_queue(mods_queue, pool_size, logger) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/r10k/content_synchronizer.rb', line 26

def self.sync_queue(mods_queue, pool_size, logger)
  logger.debug _("Updating modules with %{pool_size} threads") % {pool_size: pool_size}
  thread_pool = pool_size.times.map { sync_thread(mods_queue, logger) }
  thread_exception = nil

  # If any threads raise an exception the deployment is considered a failure.
  # In that event clear the queue, wait for other threads to finish their
  # current work, then re-raise the first exception caught.
  begin
    thread_pool.each(&:join)
  rescue => e
    logger.error _("Error during concurrent deploy of a module: %{message}") % {message: e.message}
    mods_queue.clear
    thread_exception ||= e
    retry
  ensure
    raise thread_exception unless thread_exception.nil?
  end
end

.sync_thread(mods_queue, logger) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/r10k/content_synchronizer.rb', line 68

def self.sync_thread(mods_queue, logger)
  Thread.new do
    begin
      while mods = mods_queue.pop(true) do
        mods.each { |mod| mod.sync }
      end
    rescue ThreadError => e
      logger.debug _("Module thread %{id} exiting: %{message}") % {message: e.message, id: Thread.current.object_id}
      Thread.exit
    rescue => e
      Thread.main.raise(e)
    end
  end
end