Module: R10K::ContentSynchronizer

Defined in:
lib/r10k/content_synchronizer.rb

Class Method Summary collapse

Class Method Details

.concurrent_accept(modules, visitor, loader, pool_size, logger) ⇒ Object

Returns a Queue of the names of modules actually updated



20
21
22
23
# File 'lib/r10k/content_synchronizer.rb', line 20

def self.concurrent_accept(modules, visitor, loader, pool_size, logger)
  mods_queue = modules_visit_queue(modules, visitor, loader)
  sync_queue(mods_queue, pool_size, logger)
end

.concurrent_sync(modules, pool_size, logger) ⇒ Object

Returns a Queue of the names of modules actually updated



26
27
28
29
# File 'lib/r10k/content_synchronizer.rb', line 26

def self.concurrent_sync(modules, pool_size, logger)
  mods_queue = modules_sync_queue(modules)
  sync_queue(mods_queue, pool_size, logger)
end

.enqueue_modules(queue, modules) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/r10k/content_synchronizer.rb', line 69

def self.enqueue_modules(queue, modules)
  modules_by_cachedir = modules.group_by { |mod| mod.cachedir }
  modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || []

  modules_without_vcs_cachedir.each {|mod| queue << Array(mod) }
  modules_by_cachedir.values.each {|mods| queue << mods }
end

.modules_sync_queue(modules) ⇒ Object



63
64
65
66
67
# File 'lib/r10k/content_synchronizer.rb', line 63

def self.modules_sync_queue(modules)
  Queue.new.tap do |queue|
    enqueue_modules(queue, modules)
  end
end

.modules_visit_queue(modules, visitor, loader) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/r10k/content_synchronizer.rb', line 55

def self.modules_visit_queue(modules, visitor, loader)
  Queue.new.tap do |queue|
    visitor.visit(:puppetfile, loader) do
      enqueue_modules(queue, modules)
    end
  end
end

.serial_accept(modules, visitor, loader) ⇒ Object



4
5
6
7
8
# File 'lib/r10k/content_synchronizer.rb', line 4

def self.serial_accept(modules, visitor, loader)
  visitor.visit(:puppetfile, loader) do
    serial_sync(modules)
  end
end

.serial_sync(modules) ⇒ Object



10
11
12
13
14
15
16
17
# File 'lib/r10k/content_synchronizer.rb', line 10

def self.serial_sync(modules)
  updated_modules = []
  modules.each do |mod|
    updated = mod.sync
    updated_modules << mod.name if updated
  end
  updated_modules
end

.sync_queue(mods_queue, pool_size, logger) ⇒ Object

Returns a Queue of the names of modules actually updated



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/r10k/content_synchronizer.rb', line 32

def self.sync_queue(mods_queue, pool_size, logger)
  logger.debug _("Updating modules with %{pool_size} threads") % {pool_size: pool_size}
  updated_modules = Queue.new
  thread_pool = pool_size.times.map { sync_thread(mods_queue, logger, updated_modules) }
  thread_exception = nil

  # If any threads raise an exception the deployment is considered a failure.
  # In that event clear the queue, wait for other threads to finish their
  # current work, then re-raise the first exception caught.
  begin
    thread_pool.each(&:join)
    # Return the list of all modules that were actually updated
    updated_modules
  rescue => e
    logger.error _("Error during concurrent deploy of a module: %{message}") % {message: e.message}
    mods_queue.clear
    thread_exception ||= e
    retry
  ensure
    raise thread_exception unless thread_exception.nil?
  end
end

.sync_thread(mods_queue, logger, updated_modules) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/r10k/content_synchronizer.rb', line 77

def self.sync_thread(mods_queue, logger, updated_modules)
  Thread.new do
    begin
      while mods = mods_queue.pop(true) do
        mods.each do |mod|
          updated = mod.sync
          updated_modules << mod.name if updated
        end
      end
    rescue ThreadError => e
      logger.debug _("Module thread %{id} exiting: %{message}") % {message: e.message, id: Thread.current.object_id}
      Thread.exit
    rescue => e
      Thread.main.raise(e)
    end
  end
end