Class: Emissary::Agent::Rabbitmq

Inherits:
Emissary::Agent show all
Defined in:
lib/emissary/agent/rabbitmq.rb

Defined Under Namespace

Classes: CommandExecutionError

Constant Summary collapse

NIMBUL_VHOST =
'/nimbul'
NODE_CONFIG_ACL =
'^i-[a-f0-9.]+$'
NODE_READ_ACL =
'^(amq.*|i-[a-f0-9.]+|request.%%ID%%.*)$'
NODE_WRITE_ACL =
'^(amq.*|i-[a-f0-9.]+|(startup|info|shutdown).%%ID%%.*|nimbul)$'
QUEUE_INFO_ITEMS =
%w[
    name durable auto_delete arguments pid owner_pid
    exclusive_consumer_pid exclusive_consumer_tag
    messages_ready messages_unacknowledged messages_uncommitted
    messages acks_uncommitted consumers transactions memory      
]
EXCHANGE_INFO_ITEMS =
%w[
  name type durable auto_delete arguments
]
CONNECTION_INFO_ITEMS =
%w[
    pid address port peer_address peer_port state channels user
    vhost timeout frame_max client_properties recv_oct recv_cnt
    send_oct send_cnt send_pend
]
CHANNEL_INFO_ITEMS =
%w[
  pid connection number user vhost transactional consumer_count
  messages_unacknowledged acks_uncommitted prefetch_count
]
BINDINGS_INFO_COLUMNS =
%w[ exchange_name queue_name routing_key arguments ]
CONSUMER_INFO_COLUMNS =
%w[ queue_name channel_process_id consumer_tag must_acknowledge ]

Instance Attribute Summary

Attributes inherited from Emissary::Agent

#args, #config, #message, #method, #name, #operator

Instance Method Summary collapse

Methods inherited from Emissary::Agent

#activate, #initialize, #post_init, #send

Constructor Details

This class inherits a constructor from Emissary::Agent

Instance Method Details

#add_node_account(user, password, namespace_id) ⇒ Object



164
165
166
167
168
169
170
171
# File 'lib/emissary/agent/rabbitmq.rb', line 164

def (user, password, namespace_id)
  begin
    add_user(user, password)
    (user, namespace_id.to_s)
  rescue CommandExecutionError => e
    "failed to add new node account: #{user}:#{namespace_id.to_s}"
  end
end

#add_node_account_acl(user, namespace_id) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/emissary/agent/rabbitmq.rb', line 150

def (user, namespace_id)
  config_acl = NODE_CONFIG_ACL.gsub('%%ID%%', namespace_id.to_s)
  write_acl  = NODE_WRITE_ACL.gsub('%%ID%%', namespace_id.to_s)
  read_acl   = NODE_READ_ACL.gsub('%%ID%%', namespace_id.to_s)
  
  begin      
    set_vhost_permissions(user, NIMBUL_VHOST, config_acl, write_acl, read_acl)
  rescue CommandExecutionError => e
    "problem adding account acls for user: #{user}: #{e.message}"
  else
    "successfully added account acls for user: #{user}"
  end
end

#add_user(user, pass) ⇒ Object



191
192
193
194
195
196
197
# File 'lib/emissary/agent/rabbitmq.rb', line 191

def add_user(user, pass)
  begin
    !!rabbitmqctl(:add_user, user, pass)
  rescue CommandExecutionError => e
    raise e unless e.message.include? 'user_already_exists'
  end
end

#add_vhost(path) ⇒ Object



183
184
185
186
187
188
189
# File 'lib/emissary/agent/rabbitmq.rb', line 183

def add_vhost(path)
  begin
    !!rabbitmqctl(:add_vhost, path)
  rescue CommandExecutionError => e
    raise e unless e.message.include? 'vhost_already_exists'
  end
end

#change_password(user, pass) ⇒ Object



199
200
201
202
203
204
205
206
# File 'lib/emissary/agent/rabbitmq.rb', line 199

def change_password(user, pass)
  begin
    !!rabbitmqctl(:change_password, user, pass)
  rescue CommandExecutionError => e
    return false if e.message.include? 'no_such_user'
    raise e 
  end
end

#del_node_account_acl(user, vhost) ⇒ Object



173
174
175
176
177
178
179
180
181
# File 'lib/emissary/agent/rabbitmq.rb', line 173

def (user, vhost)
  begin
    del_vhost_permissions(user, vhost)
  rescue CommandExecutionError => e
    "problem unmapping user from vhost: #{user}:#{vhost} #{e.message}"
  else
    "successfully unmapped user from vhost: #{user}:#{vhost}"
  end
end

#del_vhost_permissions(user, vhost) ⇒ Object



145
146
147
148
# File 'lib/emissary/agent/rabbitmq.rb', line 145

def del_vhost_permissions(user, vhost)
  vhost = vhost.empty? ? '/' : vhost
  rabbitmqctl(:clear_permissions, '-p', vhost, user)
end

#delete_user(user) ⇒ Object



208
209
210
211
212
213
214
# File 'lib/emissary/agent/rabbitmq.rb', line 208

def delete_user(user)
  begin
    !!rabbitmqctl(:delete_user, user)
  rescue CommandExecutionError => e
    raise e unless e.message.include? 'no_such_user'
  end
end

#delete_vhost(path) ⇒ Object



216
217
218
219
220
221
222
# File 'lib/emissary/agent/rabbitmq.rb', line 216

def delete_vhost(path)
  begin
    !!rabbitmqctl(:delete_vhost, path)
  rescue CommandExecutionError => e
    raise e unless e.message.include? 'no_such_vhost'
  end
end

#list_bindings(vhost) ⇒ Object



86
87
88
89
90
91
# File 'lib/emissary/agent/rabbitmq.rb', line 86

def list_bindings(vhost)
  vhost = vhost.empty? ? '/' : vhost
  rabbitmqctl(:list_bindings, '-p', vhost).collect do |line|
    Hash[*BINDINGS_INFO_COLUMNS.zip(line.split(/\s+/)).flatten]
  end
end

#list_channelsObject



106
107
108
109
110
# File 'lib/emissary/agent/rabbitmq.rb', line 106

def list_channels
  rabbitmqctl(:list_channels, CHANNEL_INFO_ITEMS.join(" ")).collect do |line|
    Hash[*CHANNEL_INFO_ITEMS.zip(line.split(/\s+/)).flatten]
  end
end

#list_connectionsObject



100
101
102
103
104
# File 'lib/emissary/agent/rabbitmq.rb', line 100

def list_connections
  rabbitmqctl(:list_connections, CONNECTION_INFO_ITEMS.join(" ")).collect do |line|
    Hash[*CONNECTION_INFO_ITEMS.zip(line.split(/\s+/)).flatten]
  end
end

#list_consumers(vhost) ⇒ Object



112
113
114
115
116
117
# File 'lib/emissary/agent/rabbitmq.rb', line 112

def list_consumers(vhost)
  vhost = vhost.empty? ? '/' : vhost
  rabbitmqctl(:list_consumers, '-p', vhost).collect do |line|
    Hash[*CONSUMER_INFO_COLUMNS.zip(line.split(/\s+/)).flatten]
  end
end

#list_exchanges(vhost) ⇒ Object



93
94
95
96
97
98
# File 'lib/emissary/agent/rabbitmq.rb', line 93

def list_exchanges(vhost)
  vhost = vhost.empty? ? '/' : vhost
  rabbitmqctl(:list_exchanges, '-p', vhost, EXCHANGE_INFO_ITEMS.join(" ")).collect do |line|
    Hash[*EXCHANGE_INFO_ITEMS.zip(line.split(/\s+/)).flatten]
  end
end

#list_queues(vhost) ⇒ Object



79
80
81
82
83
84
# File 'lib/emissary/agent/rabbitmq.rb', line 79

def list_queues(vhost)
  vhost = vhost.empty? ? '/' : vhost
  rabbitmqctl(:list_queues, '-p', vhost, QUEUE_INFO_ITEMS.join(" ")).collect do |line|
    Hash[*QUEUE_INFO_ITEMS.zip(line.split(/\s+/)).flatten]
  end
end

#list_user_vhosts(user) ⇒ Object



136
137
138
# File 'lib/emissary/agent/rabbitmq.rb', line 136

def list_user_vhosts(user)
  list_vhosts.select { |vhost| list_vhost_users(vhost).include? user }
end

#list_usersObject



119
120
121
# File 'lib/emissary/agent/rabbitmq.rb', line 119

def list_users
  rabbitmqctl(:list_users)
end

#list_vhost_users(vhost) ⇒ Object



127
128
129
130
131
132
133
134
# File 'lib/emissary/agent/rabbitmq.rb', line 127

def list_vhost_users(vhost)
  vhost = vhost.empty? ? '/' : vhost
  rabbitmqctl(:list_permissions, '-p', vhost).flatten.select { |l|
    !l.nil?
  }.collect {
    |l| l.split(/\s+/)[0]
  }
end

#list_vhostsObject



123
124
125
# File 'lib/emissary/agent/rabbitmq.rb', line 123

def list_vhosts
  rabbitmqctl(:list_vhosts)
end

#rabbitmqctl(*args) ⇒ Object



224
225
226
227
228
229
230
231
# File 'lib/emissary/agent/rabbitmq.rb', line 224

def rabbitmqctl(*args)
  result = []
  `rabbitmqctl #{Escape.shell_command([*args.collect{|a| a.to_s}])} 2>&1`.each do |line|
    raise CommandExecutionError, $1 if line =~ /Error: (.*)/
    result << line.chomp unless line =~ /\.\.\./
  end
  result
end

#set_vhost_permissions(user, vhost, config, write, read) ⇒ Object



140
141
142
143
# File 'lib/emissary/agent/rabbitmq.rb', line 140

def set_vhost_permissions(user, vhost, config, write, read)
  vhost = vhost.empty? ? '/' : vhost
  rabbitmqctl(:set_permissions, '-p', vhost, user, config, write, read)
end

#valid_methodsObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/emissary/agent/rabbitmq.rb', line 53

def valid_methods
  [
    :add_user,
    :delete_user,
    :change_password,
    :list_users,
    
    :add_vhost,
    :delete_vhost,
    :list_vhosts,
    
    :add_node_account,
    :del_node_account,
    
    :list_user_vhosts,
    :list_vhost_users,
    
    :list_queues,
    :list_bindings,
    :list_exchanges,
    :list_connections,
    :list_channels,
    :list_consumers
  ]
end