Module: Docker::Registry::Sync::CMD
- Includes:
- Docker::Registry::Sync
- Defined in:
- lib/docker/registry/sync/s3.rb,
lib/docker/registry/sync/cmd.rb,
lib/docker/registry/sync/sqs.rb
Constant Summary
Constants included from Docker::Registry::Sync
Class Method Summary collapse
- .configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool) ⇒ Object
- .configure_signal_handlers ⇒ Object
- .finalize_message(receipt_handle) ⇒ Object
- .finalize_workers ⇒ Object
- .image_exists?(image, bucket, region) ⇒ Boolean
- .queue_sync(image, tag) ⇒ Object
- .run_sync ⇒ Object
- .send_message_batch(messages, retries = 5) ⇒ Object
- .start_workers ⇒ Object
- .sync(image, tag) ⇒ Object
- .sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
- .sync_key_consumer ⇒ Object
- .sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) ⇒ Object
- .sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) ⇒ Object
- .sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
- .sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
Class Method Details
.configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/docker/registry/sync/cmd.rb', line 14 def configure(source_bucket, target_buckets, sqs_queue, use_sse, source_uses_sse, pool) unless source_bucket.nil? source_region, source_bucket = source_bucket.split(':') else source_region, source_bucket = nil, nil end unless target_buckets.nil? target_buckets = target_buckets.split(',').collect { |bucket| bucket.split(':') } else target_buckets = nil end unless sqs_queue.nil? sqs_region, sqs_uri = sqs_queue.split(':') else sqs_region, sqs_uri = nil, nil end @synced_images = RingBuffer.new 10000 Docker::Registry::Sync.configure do |config| config.source_bucket = source_bucket config.source_region = source_region config.target_buckets = target_buckets config.source_sse = source_uses_sse config.sse = use_sse config.sqs_region = sqs_region config.pool_size = pool config.sqs_url = "https://#{sqs_uri}" end @config = Docker::Registry::Sync.config end |
.configure_signal_handlers ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/docker/registry/sync/cmd.rb', line 48 def configure_signal_handlers @terminated = false Signal.trap('INT') do @config.logger.error 'Received INT signal...' @threads.synchronize do @producer_finished = true @terminated = true @work_queue.clear end end Signal.trap('TERM') do @config.logger.error 'Received TERM signal...' @threads.synchronize do @producer_finished = true @terminated = true @work_queue.clear end end end |
.finalize_message(receipt_handle) ⇒ Object
39 40 41 42 43 |
# File 'lib/docker/registry/sync/sqs.rb', line 39 def (receipt_handle) sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.(queue_url: @config.sqs_url, receipt_handle: receipt_handle) end |
.finalize_workers ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/docker/registry/sync/cmd.rb', line 83 def finalize_workers @threads.synchronize do @producer_finished = true end @consumer_thread.join @threads.each { |t| t.join unless t.nil? } @config.logger.info "Processing job results..." success = true loop do begin # One job filure is a run failure success &&= @status_queue.pop(true) rescue ThreadError @config.logger.info "Finished processing job results..." break end end success && !@terminated end |
.image_exists?(image, bucket, region) ⇒ Boolean
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/docker/registry/sync/s3.rb', line 10 def image_exists?(image, bucket, region) s3 = Aws::S3::Client.new(region: region) begin s3.head_object(bucket: bucket, key: "registry/repositories/#{image}/_index_images") rescue Aws::S3::Errors::NotFound false else true end end |
.queue_sync(image, tag) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/docker/registry/sync/cmd.rb', line 119 def queue_sync(image, tag) msgs = @config.target_buckets.map do |region, bucket, sse| JSON.dump(retries: 0, image: image, tag: tag, source: { bucket: @config.source_bucket, region: @config.source_region }, target: { bucket: bucket, region: region, sse: !sse.nil? }) end (msgs) ? 0 : 1 end |
.run_sync ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/docker/registry/sync/cmd.rb', line 137 def run_sync ec = 1 configure_signal_handlers begin @config.logger.info 'Polling queue for images to sync...' sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.( queue_url: @config.sqs_url, max_number_of_messages: 1, visibility_timeout: 900, # Give ourselves 15min to sync the image wait_time_seconds: 10, # Wait a maximum of 10s for a new message ) @config.logger.info "SQS returned #{resp..length} new images to sync..." if resp..length == 1 = resp.[0] data = JSON.load(.body) @config.logger.info "Image sync data: #{data}" if image_exists?(data['image'], data['target']['bucket'], data['target']['region']) start_workers @config.logger.info("Syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") success = sync_tag(data['image'], data['tag'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region']) success &&= finalize_workers if success @config.logger.info("Finished syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") (.receipt_handle) else @config.logger.info("Falied to sync tag, leaving on queue: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") end else start_workers success = sync_repo(data['image'], data['target']['bucket'], data['target']['region'], data['target']['sse'], data['source']['bucket'], data['source']['region']) success &&= finalize_workers @config.logger.info("Syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") if success @config.logger.info("Finished syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") (.receipt_handle) else @config.logger.error("Failed to sync image, leaving on queue: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") end end end ec = 0 sleep @config.empty_queue_sleep_time unless @terminated rescue => e @config.logger.error "An unknown error occurred while monitoring queue: #{e}" @config.logger.error e.traceback @config.logger.error 'Exiting...' @terminated = true ec = 1 end until @terminated ec end |
.send_message_batch(messages, retries = 5) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/docker/registry/sync/sqs.rb', line 11 def (, retries = 5) if retries <= 0 success = false .each do |msg| @config.logger.Error "Failed to Enqueue message: #{msg}" end else entries = .map do |msg| @config.logger.info "Enqueuing message: #{msg}" { id: Digest::MD5.hexdigest(msg), message_body: msg } end sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.(queue_url: @config.sqs_url, entries: entries) if resp.failed.length > 0 rerun = resp.failed.map do |failed| @config.logger.warn "Failed to Enqueue message, re-enqueuing: #{msg}" .select { |m| Digest::MD5.hexdigest(m) == failed.id }[0] end sleep 1 # short sleep before trying again... success = (rerun, retries - 1) else success = true end end success end |
.start_workers ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/docker/registry/sync/cmd.rb', line 69 def start_workers @threads = Array.new(@config.pool_size) @work_queue = Queue.new @status_queue = Queue.new @threads.extend(MonitorMixin) @threads_available = @threads.new_cond @producer_finished = false @consumer_thread = Thread.new do sync_key_consumer end end |
.sync(image, tag) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/docker/registry/sync/cmd.rb', line 104 def sync(image, tag) configure_signal_handlers start_workers success = false @config.target_buckets.each do |region, bucket, sse| if image_exists?(image, bucket, region) success = sync_tag(image, tag, bucket, region, !sse.nil?) else success = sync_repo(image, bucket, region, !sse.nil?) end end success &&= finalize_workers success ? 0 : 1 end |
.sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/docker/registry/sync/s3.rb', line 69 def sync_image(image_id, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) ancestry_resp = s3_source.get_object(bucket: source_bucket, key: "registry/images/#{image_id}/ancestry") # Ancestry includes self JSON.load(ancestry_resp.body.read).each do |image| unless @synced_images.include? "#{image}:#{region}:#{bucket}" sync_prefix(s3_source, s3_target, bucket, sse, "registry/images/#{image}/", source_bucket) @synced_images << "#{image}:#{region}:#{bucket}" end end end |
.sync_key_consumer ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/docker/registry/sync/s3.rb', line 124 def sync_key_consumer @config.logger.info "Starting sync consumer..." loop do break if @producer_finished && @work_queue.length == 0 t_index = nil begin sleep 0.1 busy = @threads.select { |t| t.nil? || t.status == false || t['finished'].nil? == false }.length == 0 end until !busy t_index = @threads.rindex { |t| t.nil? || t.status == false || t['finished'].nil? == false } begin opts = @threads.synchronize do @work_queue.pop(true) end rescue ThreadError @config.logger.info "No work found on the queue, sleeping..." sleep 1 else if opts[:key] @threads[t_index].join unless @threads[t_index].nil? @threads[t_index] = Thread.new do @config.logger.info "Worker syncing key: #{opts[:key]}" target_client = Aws::S3::Client.new(region: opts[:region]) opts.delete :region success = false begin target_client.copy_object(opts) success = true @config.logger.info "Worker finished syncing key: #{opts[:key]}" rescue => e @config.logger.error "An unknown error occoured while copying object in s3: #{e}" @config.logger.error e.backtrace ensure Thread.current['finished'] = true @threads.synchronize do @status_queue << success end end end else @config.logger.info "Queued work empty: #{opts}" end end end end |
.sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/docker/registry/sync/s3.rb', line 103 def sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) keys.each do |key| @config.logger.info "Syncing key #{source_bucket}/#{key} to bucket #{target_bucket}" opts = {acl: 'bucket-owner-full-control', region: target_client.config[:region], bucket: target_bucket, key: key, copy_source: "#{source_bucket}/#{key}"} if @config.sse || target_sse opts[:server_side_encryption] = 'AES256' end if @config.source_sse opts[:copy_source_sse_customer_algorithm] = 'AES256' end @threads.synchronize do @work_queue << opts end sleep 0.1 end end |
.sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/docker/registry/sync/s3.rb', line 86 def sync_prefix(source_client, target_client, target_bucket, target_sse, prefix, source_bucket) keys = [] img_resp = source_client.list_objects(bucket: source_bucket, prefix: prefix) loop do img_resp.contents.each do |item| keys << item.key end if img_resp.last_page? break else img_resp.next_page end end sync_keys(target_client, target_bucket, target_sse, keys, source_bucket) end |
.sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/docker/registry/sync/s3.rb', line 46 def sync_repo(repo, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin rep_prefix = "registry/repositories/#{repo}/" sync_prefix(s3_source, s3_target, bucket, sse, rep_prefix, source_bucket) img_index_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{repo}/_index_images") JSON.load(img_index_resp.body.read).each do |image| sync_image(image['id'], bucket, region, sse, source_bucket, source_region) end rescue => e @config.logger.error "An unexpected error occoured while syncing repo #{repo}: #{e}" @config.logger.error e.backtrace false else true end end |
.sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/docker/registry/sync/s3.rb', line 22 def sync_tag(image, tag, bucket, region, sse, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin keys = ["tag#{tag}_json", "tag_#{tag}", '_index_images'].map do |key| "registry/repositories/#{image}/#{key}" end sync_keys(s3_target, bucket, sse, keys, source_bucket) img_id = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/tag_#{tag}").body.read sync_image(img_id, bucket, region, sse, source_bucket, source_region) rescue => e @config.logger.error "An unexpected error occoured while syncing tag #{image}:#{tag}: #{e}" @config.logger.error e.backtrace false else true end end |