Module: Docker::Registry::Sync::CMD
- Includes:
- Docker::Registry::Sync
- Defined in:
- lib/docker/registry/sync/s3.rb,
lib/docker/registry/sync/cmd.rb,
lib/docker/registry/sync/sqs.rb
Constant Summary
Constants included from Docker::Registry::Sync
Class Method Summary collapse
- .configure(source_bucket, target_buckets, sqs_queue) ⇒ Object
- .configure_signal_handlers ⇒ Object
- .finalize_message(receipt_handle) ⇒ Object
- .image_exists?(image, bucket, region) ⇒ Boolean
- .queue_sync(image, tag) ⇒ Object
- .run_sync ⇒ Object
- .send_message_batch(messages, retries = 5) ⇒ Object
- .sync(image, tag) ⇒ Object
- .sync_image(image, bucket, region, source_bucket = nil, source_region = nil) ⇒ Object
- .sync_keys(target_client, target_bucket, keys, source_bucket = nil) ⇒ Object
- .sync_prefix(source_client, target_client, target_bucket, prefix, source_bucket = nil) ⇒ Object
- .sync_tag(image, tag, bucket, region, source_bucket = nil, source_region = nil) ⇒ Object
Class Method Details
.configure(source_bucket, target_buckets, sqs_queue) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/docker/registry/sync/cmd.rb', line 12 def configure(source_bucket, target_buckets, sqs_queue) unless source_bucket.nil? source_region, source_bucket = source_bucket.split(':') else source_region, source_bucket = nil, nil end unless target_buckets.nil? target_buckets = target_buckets.split(',').collect { |bucket| bucket.split(':') } else target_buckets = nil end unless sqs_queue.nil? sqs_region, sqs_uri = sqs_queue.split(':') else sqs_region, sqs_uri = nil, nil end Docker::Registry::Sync.configure do |config| config.source_bucket = source_bucket config.source_region = source_region config.target_buckets = target_buckets config.sqs_region = sqs_region config.sqs_url = "https://#{sqs_uri}" end @config = Docker::Registry::Sync.config end |
.configure_signal_handlers ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/docker/registry/sync/cmd.rb', line 40 def configure_signal_handlers @terminated = false Signal.trap('INT') do @config.logger.error 'Received INT signal...' @terminated = true end Signal.trap('TERM') do @config.logger.error 'Received TERM signal...' @terminated = true end end |
.finalize_message(receipt_handle) ⇒ Object
39 40 41 42 43 |
# File 'lib/docker/registry/sync/sqs.rb', line 39 def (receipt_handle) sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.(queue_url: @config.sqs_url, receipt_handle: receipt_handle) end |
.image_exists?(image, bucket, region) ⇒ Boolean
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/docker/registry/sync/s3.rb', line 10 def image_exists?(image, bucket, region) s3 = Aws::S3::Client.new(region: region) begin s3.head_object(bucket: bucket, key: "registry/repositories/#{image}/_index_images") rescue Aws::S3::Errors::NotFound false else true end end |
.queue_sync(image, tag) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/docker/registry/sync/cmd.rb', line 65 def queue_sync(image, tag) msgs = @config.target_buckets.map do |region, bucket| JSON.dump(retries: 0, image: image, tag: tag, source: { bucket: @config.source_bucket, region: @config.source_region }, target: { bucket: bucket, region: region }) end (msgs) ? 0 : 1 end |
.run_sync ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/docker/registry/sync/cmd.rb', line 82 def run_sync ec = 1 configure_signal_handlers begin @config.logger.info 'Polling queue for images to sync...' sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.( queue_url: @config.sqs_url, max_number_of_messages: 1, visibility_timeout: 900, # Give ourselves 15min to sync the image wait_time_seconds: 10, # Wait a maximum of 10s for a new message ) @config.logger.info "SQS returned #{resp.messages.length} new images to sync..." if resp..length == 1 = resp.[0] data = JSON.load(.body) @config.logger.info "Image sync data: #{data}" if image_exists?(data['image'], data['target']['bucket'], data['target']['region']) @config.logger.info("Syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") if sync_tag(data['image'], data['tag'], data['target']['bucket'], data['target']['region'], data['source']['bucket'], data['source']['region']) @config.logger.info("Finished syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") (.receipt_handle) else @config.logger.info("Falied to sync tag, leaving on queue: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}") end else @config.logger.info("Syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") if sync_image(data['image'], data['target']['bucket'], data['target']['region'], data['source']['bucket'], data['source']['region']) @config.logger.info("Finished syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") (.receipt_handle) else @config.logger.error("Failed to sync image, leaving on queue: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}") end end end ec = 0 sleep @config.empty_queue_sleep_time unless @terminated rescue Exception => e @config.logger.error "An unknown error occurred while monitoring queue: #{e}" @config.logger.error 'Exiting...' @terminated = true ec = 1 end until @terminated ec end |
.send_message_batch(messages, retries = 5) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/docker/registry/sync/sqs.rb', line 11 def (, retries = 5) if retries <= 0 success = false .each do |msg| @config.logger.Error "Failed to Enqueue message: #{msg}" end else entries = .map do |msg| @config.logger.info "Enqueuing message: #{msg}" { id: Digest::MD5.hexdigest(msg), message_body: msg } end sqs = Aws::SQS::Client.new(region: @config.sqs_region) resp = sqs.(queue_url: @config.sqs_url, entries: entries) if resp.failed.length > 0 rerun = resp.failed.map do |failed| @config.logger.warn "Failed to Enqueue message, re-enqueuing: #{msg}" .select { |m| Digest::MD5.hexdigest(m) == failed.id }[0] end sleep 1 # short sleep before trying again... success = (rerun, retries - 1) else success = true end end success end |
.sync(image, tag) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/docker/registry/sync/cmd.rb', line 53 def sync(image, tag) success = false @config.target_buckets.each do |region, bucket| if image_exists?(image, bucket, region) success = sync_tag(image, tag, bucket, region) else success = sync_image(image, bucket, region) end end success ? 0 : 1 end |
.sync_image(image, bucket, region, source_bucket = nil, source_region = nil) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/docker/registry/sync/s3.rb', line 74 def sync_image(image, bucket, region, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin rep_prefix = "registry/repositories/#{image}/" sync_prefix(s3_source, s3_target, bucket, rep_prefix) img_index_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/_index_images") JSON.load(img_index_resp.body.read).each do |image| image_prefix = "registry/images/#{image['id']}/" sync_prefix(s3_source, s3_target, bucket, image_prefix) end rescue Exception => e @config.logger.error "An unexpected error occoured while syncing image #{image}: #{e}" false else true end end |
.sync_keys(target_client, target_bucket, keys, source_bucket = nil) ⇒ Object
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/docker/registry/sync/s3.rb', line 39 def sync_keys(target_client, target_bucket, keys, source_bucket = nil) source_bucket ||= @config.source_bucket keys.each do |key| @config.logger.info "Syncing key #{source_bucket}/#{key} to bucket #{target_bucket}" target_client.copy_object(acl: 'bucket-owner-full-control', bucket: target_bucket, key: key, copy_source: "#{source_bucket}/#{key}") end end |
.sync_prefix(source_client, target_client, target_bucket, prefix, source_bucket = nil) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/docker/registry/sync/s3.rb', line 21 def sync_prefix(source_client, target_client, target_bucket, prefix, source_bucket = nil) source_bucket ||= @config.source_bucket keys = [] img_resp = source_client.list_objects(bucket: source_bucket, prefix: prefix) loop do img_resp.contents.each do |item| keys << item.key end if img_resp.last_page? break else img_resp.next_page end end sync_keys(target_client, target_bucket, keys) end |
.sync_tag(image, tag, bucket, region, source_bucket = nil, source_region = nil) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/docker/registry/sync/s3.rb', line 50 def sync_tag(image, tag, bucket, region, source_bucket = nil, source_region = nil) source_region ||= @config.source_region source_bucket ||= @config.source_bucket s3_source = Aws::S3::Client.new(region: source_region) s3_target = Aws::S3::Client.new(region: region) begin keys = ["tag#{tag}_json", "tag_#{tag}", '_index_images'].map do |key| "registry/repositories/#{image}/#{key}" end sync_keys(s3_target, bucket, keys) img_id_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/tag_#{tag}") img_prefix = "registry/images/#{img_id_resp.body.read}/" sync_prefix(s3_source, s3_target, bucket, img_prefix) rescue Exception => e @config.logger.error "An unexpected error occoured while syncing tag #{image}:#{tag}: #{e}" false else true end end |