Class: Cumulus::SQS::Manager

Inherits:
Common::Manager show all
Defined in:
lib/sqs/manager/Manager.rb

Instance Method Summary collapse

Methods inherited from Common::Manager

#diff, #diff_one, #filter_local, #list, #sync, #sync_one

Constructor Details

#initializeManager

Returns a new instance of Manager.



14
15
16
17
# File 'lib/sqs/manager/Manager.rb', line 14

def initialize
  super()
  @create_asset = true
end

Instance Method Details

#added_diff(local) ⇒ Object



35
36
37
# File 'lib/sqs/manager/Manager.rb', line 35

def added_diff(local)
  QueueDiff.added(local)
end

#aws_resourcesObject



27
28
29
# File 'lib/sqs/manager/Manager.rb', line 27

def aws_resources
  @aws_resources ||= Hash[SQS::queue_attributes.map { |name, attrs| [name, QueueConfig.new(name).populate!(attrs) ] }]
end

#create(local) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/sqs/manager/Manager.rb', line 69

def create(local)
  url = SQS::client.create_queue({
    queue_name: local.name,
    attributes: {
      "DelaySeconds" => "#{local.delay}",
      "MaximumMessageSize" => "#{local.max_message_size}",
      "MessageRetentionPeriod" => "#{local.message_retention}",
      "Policy" => if local.policy then JSON.generate(Loader.policy(local.policy)) end,
      "ReceiveMessageWaitTimeSeconds" => "#{local.receive_wait_time}",
      "VisibilityTimeout" => "#{local.visibility_timeout}",
      "RedrivePolicy" => if local.dead_letter then JSON.generate(local.dead_letter.to_aws) end
    }.reject { |k, v| v.nil? }
  }).queue_url
  puts Colors.blue("Queue #{local.name} was created with url #{url}")
end

#diff_resource(local, aws) ⇒ Object



39
40
41
# File 'lib/sqs/manager/Manager.rb', line 39

def diff_resource(local, aws)
  local.diff(aws)
end

#local_resourcesObject



23
24
25
# File 'lib/sqs/manager/Manager.rb', line 23

def local_resources
  @local_resources ||= Hash[Loader.queues.map { |local| [local.name, local] }]
end

#migrateObject

Public: Migrates the existing AWS config to Cumulus



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/sqs/manager/Manager.rb', line 86

def migrate
  # Create the directories
  sqs_dir = "#{@migration_root}/sqs"
  policies_dir = "#{sqs_dir}/policies"
  queues_dir = "#{sqs_dir}/queues"

  if !Dir.exists?(@migration_root)
    Dir.mkdir(@migration_root)
  end
  if !Dir.exists?(sqs_dir)
    Dir.mkdir(sqs_dir)
  end
  if !Dir.exists?(policies_dir)
    Dir.mkdir(policies_dir)
  end
  if !Dir.exists?(queues_dir)
    Dir.mkdir(queues_dir)
  end

  puts Colors.blue("Migrating queues to #{queues_dir}")
  aws_resources.each do |name, config|
    puts Colors.blue("Migrating queue #{name}")

    # If there is a policy, then save it to the policies dir with the name of the queue
    queue_policy = SQS::queue_policy(name)
    if queue_policy
      policy_json = JSON.pretty_generate(queue_policy)
      policy_file = "#{policies_dir}/#{name}.json"
      puts "Migrating policy to #{policy_file}"
      File.open("#{policy_file}", "w") { |f| f.write(policy_json) }
    end

    json = JSON.pretty_generate(config.to_hash)
    File.open("#{queues_dir}/#{name}.json", "w") { |f| f.write(json) }
  end

end

#resource_nameObject



19
20
21
# File 'lib/sqs/manager/Manager.rb', line 19

def resource_name
  "Queue"
end

#unmanaged_diff(aws) ⇒ Object



31
32
33
# File 'lib/sqs/manager/Manager.rb', line 31

def unmanaged_diff(aws)
  QueueDiff.unmanaged(aws)
end

#update(local, diffs) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sqs/manager/Manager.rb', line 50

def update(local, diffs)
  SQS::client.set_queue_attributes({
    queue_url: SQS::queue_urls[local.name],
    attributes: {
      "DelaySeconds" => if diffs.any? { |d| d.type == QueueChange::DELAY } then local.delay.to_s end,
      "MaximumMessageSize" => if diffs.any? { |d| d.type == QueueChange::MESSAGE_SIZE } then local.max_message_size.to_s end,
      "MessageRetentionPeriod" => if diffs.any? { |d| d.type == QueueChange::MESSAGE_RETENTION } then local.message_retention.to_s end,
      "Policy" => if diffs.any? { |d| d.type == QueueChange::POLICY }
        if local.policy then JSON.generate(Loader.policy(local.policy)) else "" end
      end,
      "ReceiveMessageWaitTimeSeconds" => if diffs.any? { |d| d.type == QueueChange::RECEIVE_WAIT } then local.receive_wait_time.to_s end,
      "VisibilityTimeout" => if diffs.any? { |d| d.type == QueueChange::VISIBILITY } then local.visibility_timeout end,
      "RedrivePolicy" => if diffs.any? { |d| d.type == QueueChange::DEAD }
        if local.dead_letter then JSON.generate(local.dead_letter.to_aws) else "" end
      end
    }.reject { |k, v| v.nil? }
  })
end

#urlsObject



43
44
45
46
47
48
# File 'lib/sqs/manager/Manager.rb', line 43

def urls
  local_resources.keys.sort.each do |name|
    url = SQS::queue_urls[name] || "does not exist"
    puts "#{name} => #{url}"
  end
end