Module: Docker::Registry::Sync::CMD

Includes:
Docker::Registry::Sync
Defined in:
lib/docker/registry/sync/s3.rb,
lib/docker/registry/sync/cmd.rb,
lib/docker/registry/sync/sqs.rb

Constant Summary

Constants included from Docker::Registry::Sync

VERSION

Class Method Summary collapse

Class Method Details

.configure(source_bucket, target_buckets, sqs_queue) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/docker/registry/sync/cmd.rb', line 12

def configure(source_bucket, target_buckets, sqs_queue)
  unless source_bucket.nil?
    source_region, source_bucket = source_bucket.split(':')
  else
    source_region, source_bucket = nil, nil
  end

  unless target_buckets.nil?
    target_buckets = target_buckets.split(',').collect { |bucket| bucket.split(':') }
  else
    target_buckets = nil
  end

  unless sqs_queue.nil?
    sqs_region, sqs_uri = sqs_queue.split(':')
  else
    sqs_region, sqs_uri = nil, nil
  end
  Docker::Registry::Sync.configure do |config|
    config.source_bucket = source_bucket
    config.source_region = source_region
    config.target_buckets = target_buckets
    config.sqs_region = sqs_region
    config.sqs_url = "https://#{sqs_uri}"
  end
  @config = Docker::Registry::Sync.config
end

.configure_signal_handlersObject



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/docker/registry/sync/cmd.rb', line 40

def configure_signal_handlers
  @terminated = false

  Signal.trap('INT') do
    @config.logger.error 'Received INT signal...'
    @terminated = true
  end
  Signal.trap('TERM') do
    @config.logger.error 'Received TERM signal...'
    @terminated = true
  end
end

.finalize_message(receipt_handle) ⇒ Object



39
40
41
42
43
# File 'lib/docker/registry/sync/sqs.rb', line 39

def finalize_message(receipt_handle)
  sqs = Aws::SQS::Client.new(region: @config.sqs_region)
  resp = sqs.delete_message(queue_url: @config.sqs_url,
                            receipt_handle: receipt_handle)
end

.image_exists?(image, bucket, region) ⇒ Boolean

Returns:

  • (Boolean)


10
11
12
13
14
15
16
17
18
19
# File 'lib/docker/registry/sync/s3.rb', line 10

def image_exists?(image, bucket, region)
  s3 = Aws::S3::Client.new(region: region)
  begin
    s3.head_object(bucket: bucket, key: "registry/repositories/#{image}/_index_images")
  rescue Aws::S3::Errors::NotFound
    false
  else
    true
  end
end

.queue_sync(image, tag) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/docker/registry/sync/cmd.rb', line 65

def queue_sync(image, tag)
  msgs = @config.target_buckets.map do |region, bucket|
    JSON.dump(retries: 0,
              image: image,
              tag: tag,
              source: {
                bucket: @config.source_bucket,
                region: @config.source_region
              },
              target: {
                bucket: bucket,
                region: region
              })
  end
  send_message_batch(msgs) ? 0 : 1
end

.run_syncObject



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/docker/registry/sync/cmd.rb', line 82

def run_sync
  ec = 1
  configure_signal_handlers
  begin
    @config.logger.info 'Polling queue for images to sync...'
    sqs = Aws::SQS::Client.new(region: @config.sqs_region)
    resp = sqs.receive_message(
      queue_url: @config.sqs_url,
      max_number_of_messages: 1,
      visibility_timeout: 900, # Give ourselves 15min to sync the image
      wait_time_seconds: 10, # Wait a maximum of 10s for a new message
    )
    @config.logger.info "SQS returned #{resp.messages.length} new images to sync..."
    if resp.messages.length == 1
      message = resp.messages[0]
      data = JSON.load(message.body)
      @config.logger.info "Image sync data:  #{data}"

      if image_exists?(data['image'], data['target']['bucket'], data['target']['region'])
        @config.logger.info("Syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
        if sync_tag(data['image'], data['tag'], data['target']['bucket'], data['target']['region'], data['source']['bucket'], data['source']['region'])
          @config.logger.info("Finished syncing tag: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
          finalize_message(message.receipt_handle)
        else
          @config.logger.info("Falied to sync tag, leaving on queue: #{data['image']}:#{data['tag']} to #{data['target']['region']}:#{data['target']['bucket']}")
        end
      else
        @config.logger.info("Syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
        if sync_image(data['image'], data['target']['bucket'], data['target']['region'], data['source']['bucket'], data['source']['region'])
          @config.logger.info("Finished syncing image: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
          finalize_message(message.receipt_handle)
        else
          @config.logger.error("Failed to sync image, leaving on queue: #{data['image']} to #{data['target']['region']}:#{data['target']['bucket']}")
        end
      end
    end
    ec = 0
    sleep @config.empty_queue_sleep_time unless @terminated
  rescue Exception => e
    @config.logger.error "An unknown error occurred while monitoring queue: #{e}"
    @config.logger.error 'Exiting...'
    @terminated = true
    ec = 1
  end until @terminated
  ec
end

.send_message_batch(messages, retries = 5) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/docker/registry/sync/sqs.rb', line 11

def send_message_batch(messages, retries = 5)
  if retries <= 0
    success = false
    messages.each do |msg|
      @config.logger.Error "Failed to Enqueue message: #{msg}"
    end
  else
    entries = messages.map do |msg|
      @config.logger.info "Enqueuing message: #{msg}"
      { id: Digest::MD5.hexdigest(msg), message_body: msg }
    end
    sqs = Aws::SQS::Client.new(region: @config.sqs_region)
    resp = sqs.send_message_batch(queue_url: @config.sqs_url,
                                  entries: entries)
    if resp.failed.length > 0
      rerun = resp.failed.map do |failed|
        @config.logger.warn "Failed to Enqueue message, re-enqueuing: #{msg}"
        messages.select { |m| Digest::MD5.hexdigest(m) == failed.id }[0]
      end
      sleep 1 # short sleep before trying again...
      success = send_message_batch(rerun, retries - 1)
    else
      success = true
    end
  end
  success
end

.sync(image, tag) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/docker/registry/sync/cmd.rb', line 53

def sync(image, tag)
  success = false
  @config.target_buckets.each do |region, bucket|
    if image_exists?(image, bucket, region)
      success = sync_tag(image, tag, bucket, region)
    else
      success = sync_image(image, bucket, region)
    end
  end
  success ? 0 : 1
end

.sync_image(image, bucket, region, source_bucket = nil, source_region = nil) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/docker/registry/sync/s3.rb', line 74

def sync_image(image, bucket, region, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket
  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  begin
    rep_prefix = "registry/repositories/#{image}/"
    sync_prefix(s3_source, s3_target, bucket, rep_prefix)

    img_index_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/_index_images")
    JSON.load(img_index_resp.body.read).each do |image|
      image_prefix = "registry/images/#{image['id']}/"
      sync_prefix(s3_source, s3_target, bucket, image_prefix)
    end
  rescue Exception => e
    @config.logger.error "An unexpected error occoured while syncing image #{image}: #{e}"
    false
  else
    true
  end
end

.sync_keys(target_client, target_bucket, keys, source_bucket = nil) ⇒ Object



39
40
41
42
43
44
45
46
47
48
# File 'lib/docker/registry/sync/s3.rb', line 39

def sync_keys(target_client, target_bucket, keys, source_bucket = nil)
  source_bucket ||= @config.source_bucket
  keys.each do |key|
    @config.logger.info "Syncing key #{source_bucket}/#{key} to bucket #{target_bucket}"
    target_client.copy_object(acl: 'bucket-owner-full-control',
                              bucket: target_bucket,
                              key: key,
                              copy_source: "#{source_bucket}/#{key}")
  end
end

.sync_prefix(source_client, target_client, target_bucket, prefix, source_bucket = nil) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/docker/registry/sync/s3.rb', line 21

def sync_prefix(source_client, target_client, target_bucket, prefix, source_bucket = nil)
  source_bucket ||= @config.source_bucket
  keys = []
  img_resp = source_client.list_objects(bucket: source_bucket, prefix: prefix)

  loop do
    img_resp.contents.each do |item|
      keys << item.key
    end
    if img_resp.last_page?
      break
    else
      img_resp.next_page
    end
  end
  sync_keys(target_client, target_bucket, keys)
end

.sync_tag(image, tag, bucket, region, source_bucket = nil, source_region = nil) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/docker/registry/sync/s3.rb', line 50

def sync_tag(image, tag, bucket, region, source_bucket = nil, source_region = nil)
  source_region ||= @config.source_region
  source_bucket ||= @config.source_bucket

  s3_source = Aws::S3::Client.new(region: source_region)
  s3_target = Aws::S3::Client.new(region: region)

  begin
    keys = ["tag#{tag}_json", "tag_#{tag}", '_index_images'].map do |key|
      "registry/repositories/#{image}/#{key}"
    end
    sync_keys(s3_target, bucket, keys)

    img_id_resp = s3_source.get_object(bucket: source_bucket, key: "registry/repositories/#{image}/tag_#{tag}")
    img_prefix = "registry/images/#{img_id_resp.body.read}/"
    sync_prefix(s3_source, s3_target, bucket, img_prefix)
  rescue Exception => e
    @config.logger.error "An unexpected error occoured while syncing tag #{image}:#{tag}: #{e}"
    false
  else
    true
  end
end