Class: Cumulus::SQS::Manager

Inherits:
Common::Manager show all
Defined in:
lib/sqs/manager/Manager.rb

Instance Method Summary collapse

Methods inherited from Common::Manager

#diff, #diff_one, #filter_local, #list, #sync, #sync_one

Constructor Details

#initializeManager

Returns a new instance of Manager.



15
16
17
18
# File 'lib/sqs/manager/Manager.rb', line 15

def initialize
  super()
  @create_asset = true
end

Instance Method Details

#added_diff(local) ⇒ Object



36
37
38
# File 'lib/sqs/manager/Manager.rb', line 36

def added_diff(local)
  QueueDiff.added(local)
end

#aws_resourcesObject



28
29
30
# File 'lib/sqs/manager/Manager.rb', line 28

def aws_resources
  @aws_resources ||= Hash[SQS::queue_attributes.map { |name, attrs| [name, QueueConfig.new(name).populate!(attrs) ] }]
end

#create(local) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/sqs/manager/Manager.rb', line 70

def create(local)
  url = SQS::client.create_queue({
    queue_name: local.name,
    attributes: {
      "DelaySeconds" => "#{local.delay}",
      "MaximumMessageSize" => "#{local.max_message_size}",
      "MessageRetentionPeriod" => "#{local.message_retention}",
      "Policy" => if local.policy then JSON.generate(Loader.policy(local.policy)) end,
      "ReceiveMessageWaitTimeSeconds" => "#{local.receive_wait_time}",
      "VisibilityTimeout" => "#{local.visibility_timeout}",
      "RedrivePolicy" => if local.dead_letter then JSON.generate(local.dead_letter.to_aws) end
    }.reject { |k, v| v.nil? }
  }).queue_url
  puts Colors.blue("Queue #{local.name} was created with url #{url}")
end

#diff_resource(local, aws) ⇒ Object



40
41
42
# File 'lib/sqs/manager/Manager.rb', line 40

def diff_resource(local, aws)
  local.diff(aws)
end

#local_resourcesObject



24
25
26
# File 'lib/sqs/manager/Manager.rb', line 24

def local_resources
  @local_resources ||= Hash[Loader.queues.map { |local| [local.name, local] }]
end

#migrateObject

Public: Migrates the existing AWS config to Cumulus



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/sqs/manager/Manager.rb', line 87

def migrate
  # Create the directories
  sqs_dir = "#{@migration_root}/sqs"
  policies_dir = "#{sqs_dir}/policies"
  queues_dir = "#{sqs_dir}/queues"

  if !Dir.exists?(@migration_root)
    Dir.mkdir(@migration_root)
  end
  if !Dir.exists?(sqs_dir)
    Dir.mkdir(sqs_dir)
  end
  if !Dir.exists?(policies_dir)
    Dir.mkdir(policies_dir)
  end
  if !Dir.exists?(queues_dir)
    Dir.mkdir(queues_dir)
  end

  puts Colors.blue("Migrating queues to #{queues_dir}")
  aws_resources.each do |name, config|
    puts Colors.blue("Migrating queue #{name}")

    # If there is a policy, then save it to the policies dir with the name of the queue
    queue_policy = SQS::queue_policy(name)
    if queue_policy
      policy_json = JSON.pretty_generate(queue_policy)
      policy_file = "#{policies_dir}/#{name}.json"
      puts "Migrating policy to #{policy_file}"
      File.open("#{policy_file}", "w") { |f| f.write(policy_json) }
    end

    json = JSON.pretty_generate(config.to_hash)
    File.open("#{queues_dir}/#{name}.json", "w") { |f| f.write(json) }
  end

end

#resource_nameObject



20
21
22
# File 'lib/sqs/manager/Manager.rb', line 20

def resource_name
  "Queue"
end

#unmanaged_diff(aws) ⇒ Object



32
33
34
# File 'lib/sqs/manager/Manager.rb', line 32

def unmanaged_diff(aws)
  QueueDiff.unmanaged(aws)
end

#update(local, diffs) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/sqs/manager/Manager.rb', line 51

def update(local, diffs)
  SQS::client.set_queue_attributes({
    queue_url: SQS::queue_urls[local.name],
    attributes: {
      "DelaySeconds" => if diffs.any? { |d| d.type == QueueChange::DELAY } then local.delay.to_s end,
      "MaximumMessageSize" => if diffs.any? { |d| d.type == QueueChange::MESSAGE_SIZE } then local.max_message_size.to_s end,
      "MessageRetentionPeriod" => if diffs.any? { |d| d.type == QueueChange::MESSAGE_RETENTION } then local.message_retention.to_s end,
      "Policy" => if diffs.any? { |d| d.type == QueueChange::POLICY }
        if local.policy then JSON.generate(Loader.policy(local.policy)) else "" end
      end,
      "ReceiveMessageWaitTimeSeconds" => if diffs.any? { |d| d.type == QueueChange::RECEIVE_WAIT } then local.receive_wait_time.to_s end,
      "VisibilityTimeout" => if diffs.any? { |d| d.type == QueueChange::VISIBILITY } then local.visibility_timeout end,
      "RedrivePolicy" => if diffs.any? { |d| d.type == QueueChange::DEAD }
        if local.dead_letter then JSON.generate(local.dead_letter.to_aws) else "" end
      end
    }.reject { |k, v| v.nil? }
  })
end

#urlsObject



44
45
46
47
48
49
# File 'lib/sqs/manager/Manager.rb', line 44

def urls
  local_resources.keys.sort.each do |name|
    url = SQS::queue_urls[name] || "does not exist"
    puts "#{name} => #{url}"
  end
end