Class: MCollective::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/client.rb

Overview

Helpers for writing clients that can talk to agents, do discovery and so forth

Constant Summary collapse

@@request_sequence =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/mcollective/client.rb', line 6

def initialize(options)
  @config = Config.instance
  @options = nil

  if options.is_a?(String)
    # String is the path to a config file
    @config.loadconfig(options) unless @config.configured
  elsif options.is_a?(Hash)
    @config.loadconfig(options[:config]) unless @config.configured
    @options = options
    @connection_timeout = options[:connection_timeout]
  else
    raise "Invalid parameter passed to Client constructor. Valid types are Hash or String"
  end

  @connection_timeout ||= @config.connection_timeout

  @connection = PluginManager["connector_plugin"]
  @security = PluginManager["security_plugin"]

  @security.initiated_by = :client
  @subscriptions = {}

  @discoverer = Discovery.new(self)

  # Time box the connection if a timeout has been specified
  # connection_timeout defaults to nil which means it will try forever if
  # not specified
  begin
    Timeout::timeout(@connection_timeout, ClientTimeoutError) do
      @connection.connect
    end
  rescue ClientTimeoutError => e
    Log.error("Timeout occured while trying to connect to middleware")
    raise e
  end
end

Instance Attribute Details

#connection_timeoutObject

Returns the value of attribute connection_timeout.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def connection_timeout
  @connection_timeout
end

#discovererObject

Returns the value of attribute discoverer.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def discoverer
  @discoverer
end

#optionsObject

Returns the value of attribute options.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def options
  @options
end

#statsObject

Returns the value of attribute stats.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def stats
  @stats
end

Class Method Details

.request_sequenceObject



45
46
47
# File 'lib/mcollective/client.rb', line 45

def self.request_sequence
  @@request_sequence
end

Instance Method Details

#collectiveObject

Returns the configured main collective if no specific collective is specified as options



51
52
53
54
55
56
57
# File 'lib/mcollective/client.rb', line 51

def collective
  if @options[:collective].nil?
    @config.main_collective
  else
    @options[:collective]
  end
end

#createreq(msg, agent, filter = {}) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/mcollective/client.rb', line 76

def createreq(msg, agent, filter ={})
  if msg.is_a?(Message)
    request = msg
    agent = request.agent
  else
    ttl = @options[:ttl] || @config.ttl
    request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
    request.reply_to = @options[:reply_to] if @options[:reply_to]
  end

  @@request_sequence += 1

  request.encode!
  subscribe(agent, :reply) unless request.reply_to
  request
end

#disconnectObject

Disconnects cleanly from the middleware



60
61
62
63
# File 'lib/mcollective/client.rb', line 60

def disconnect
  Log.debug("Disconnecting from the middleware")
  @connection.disconnect
end

#discover(filter, timeout, limit = 0) ⇒ Object

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts



150
151
152
# File 'lib/mcollective/client.rb', line 150

def discover(filter, timeout, limit=0)
  @discoverer.discover(filter.merge({'collective' => collective}), timeout, limit)
end

#discovered_req(body, agent, options = false) ⇒ Object



268
269
270
# File 'lib/mcollective/client.rb', line 268

def discovered_req(body, agent, options=false)
  raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
end

#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object

Prints out the stats returns from req and discovered_req in a nice way



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/mcollective/client.rb', line 273

def display_stats(stats, options=false, caption="stomp call summary")
  options = @options unless options

  if options[:verbose]
    puts("\n---- #{caption} ----")

    if stats[:discovered]
      puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
    else
      puts("           Nodes: #{stats[:responses]}")
    end

    printf("      Start Time: %s\n", Time.at(stats[:starttime]))
    printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
    printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
    printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

  else
    if stats[:discovered]
      printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
    else
      printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
    end
  end

  if stats[:noresponsefrom].size > 0
    puts("\nNo response from:\n")

    stats[:noresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end
end

#receive(requestid = nil) ⇒ Object

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/mcollective/client.rb', line 118

def receive(requestid = nil)
  reply = nil

  begin
    reply = @connection.receive
    reply.type = :reply
    reply.expected_msgid = requestid

    reply.decode!

    unless reply.requestid == requestid
      raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
    end

    Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}")
  rescue SecurityValidationFailed => e
    Log.warn("Ignoring a message that did not pass security validations")
    retry
  rescue MsgDoesNotMatchRequestID => e
    Log.debug("Ignoring a message for some other client : #{e.message}")
    retry
  end

  reply
end

#req(body, agent = nil, options = false, waitfor = 0, &block) ⇒ Object

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/mcollective/client.rb', line 162

def req(body, agent=nil, options=false, waitfor=0, &block)
  if body.is_a?(Message)
    agent = body.agent
    waitfor = body.discovered_hosts.size || 0
    @options = body.options
  end

  @options = options if options
  threaded = @options[:threaded]
  timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
  request = createreq(body, agent, @options[:filter])
  publish_timeout = @options[:publish_timeout]
  stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
  STDOUT.sync = true
  hosts_responded = 0


  begin
    if threaded
      hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
    else
      hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
    end
  rescue Interrupt => e
  ensure
    unsubscribe(agent, :reply)
  end

  return update_stat(stat, hosts_responded, request.requestid)
end

#sendreq(msg, agent, filter = {}) ⇒ Object

Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses



67
68
69
70
71
72
73
74
# File 'lib/mcollective/client.rb', line 67

def sendreq(msg, agent, filter = {})
  request = createreq(msg, agent, filter)

  Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")

  request.publish
  request.requestid
end

#start_publisher(request, publish_timeout) ⇒ Object

Starts the request publishing routine



224
225
226
227
228
229
230
231
232
233
234
# File 'lib/mcollective/client.rb', line 224

def start_publisher(request, publish_timeout)
  Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
  begin
    Timeout.timeout(publish_timeout) do
      Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
      request.publish
    end
  rescue Timeout::Error => e
    Log.warn("Could not publish all messages. Publishing timed out.")
  end
end

#start_receiver(requestid, waitfor, timeout, &block) ⇒ Object

Starts the response receiver routine Expected to return the amount of received responses.



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/mcollective/client.rb', line 238

def start_receiver(requestid, waitfor, timeout, &block)
  Log.debug("Starting response receiver with timeout of #{timeout}")
  hosts_responded = 0
  begin
    Timeout.timeout(timeout) do
      begin
        resp = receive(requestid)
        yield resp.payload
        hosts_responded += 1
      end while (waitfor == 0 || hosts_responded < waitfor)
    end
  rescue Timeout::Error => e
    if (waitfor > hosts_responded)
      Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
    end
  end

  hosts_responded
end

#subscribe(agent, type) ⇒ Object



93
94
95
96
97
98
99
100
101
# File 'lib/mcollective/client.rb', line 93

def subscribe(agent, type)
  unless @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Subscribing to #{type} target for agent #{agent}")

    Util.subscribe(subscription)
    @subscriptions[agent] = 1
  end
end

#threaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/mcollective/client.rb', line 203

def threaded_req(request, publish_timeout, timeout, waitfor, &block)
  Log.debug("Starting threaded client")
  publisher = Thread.new do
    start_publisher(request, publish_timeout)
  end

  # When the client is threaded we add the publishing timeout to
  # the agent timeout so that the receiver doesn't time out before
  # publishing has finished in cases where publish_timeout >= timeout.
  total_timeout = publish_timeout + timeout
  hosts_responded = 0

  receiver = Thread.new do
    hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
  end

  receiver.join
  hosts_responded
end

#unsubscribe(agent, type) ⇒ Object



103
104
105
106
107
108
109
110
111
# File 'lib/mcollective/client.rb', line 103

def unsubscribe(agent, type)
  if @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Unsubscribing #{type} target for #{agent}")

    Util.unsubscribe(subscription)
    @subscriptions.delete(agent)
  end
end

#unthreaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher unthreaded. This is the default client behaviour.



195
196
197
198
# File 'lib/mcollective/client.rb', line 195

def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
  start_publisher(request, publish_timeout)
  start_receiver(request.requestid, waitfor, timeout, &block)
end

#update_stat(stat, hosts_responded, requestid) ⇒ Object



258
259
260
261
262
263
264
265
266
# File 'lib/mcollective/client.rb', line 258

def update_stat(stat, hosts_responded, requestid)
  stat[:totaltime] = Time.now.to_f - stat[:starttime]
  stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
  stat[:responses] = hosts_responded
  stat[:noresponsefrom] = []
  stat[:requestid] = requestid

  @stats = stat
end