Class: MCollective::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/client.rb

Overview

Helpers for writing clients that can talk to agents, do discovery and so forth

Constant Summary collapse

@@request_sequence =

rubocop:disable Style/ClassVars

0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/mcollective/client.rb', line 6

def initialize(options)
  @config = Config.instance
  @options = nil

  case options
  when String
    # String is the path to a config file
    @config.loadconfig(options) unless @config.configured
  when Hash
    @config.loadconfig(options[:config]) unless @config.configured
    @options = options
    @connection_timeout = options[:connection_timeout]
  else
    raise "Invalid parameter passed to Client constructor. Valid types are Hash or String"
  end

  @connection_timeout ||= @config.connection_timeout

  @connection = PluginManager["connector_plugin"]
  @security = PluginManager["security_plugin"]

  @security.initiated_by = :client
  @subscriptions = {}

  @discoverer = Discovery.new(self)

  # Time box the connection if a timeout has been specified
  # connection_timeout defaults to nil which means it will try forever if
  # not specified
  begin
    Timeout.timeout(@connection_timeout, ClientTimeoutError) do
      @connection.connect
    end
  rescue ClientTimeoutError => e
    Log.error("Timeout occured while trying to connect to middleware")
    raise e
  end
end

Instance Attribute Details

#connection_timeoutObject

Returns the value of attribute connection_timeout.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def connection_timeout
  @connection_timeout
end

#discovererObject

Returns the value of attribute discoverer.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def discoverer
  @discoverer
end

#optionsObject

Returns the value of attribute options.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def options
  @options
end

#statsObject

Returns the value of attribute stats.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def stats
  @stats
end

Class Method Details

.request_sequenceObject



51
52
53
# File 'lib/mcollective/client.rb', line 51

def self.request_sequence
  @@request_sequence
end

.reset_request_sequenceObject



47
48
49
# File 'lib/mcollective/client.rb', line 47

def self.reset_request_sequence
  @@request_sequence = 0 # rubocop:disable Style/ClassVars
end

Instance Method Details

#collectiveObject

Returns the configured main collective if no specific collective is specified as options



57
58
59
60
61
62
63
# File 'lib/mcollective/client.rb', line 57

def collective
  if @options[:collective].nil?
    @config.main_collective
  else
    @options[:collective]
  end
end

#createreq(msg, agent, filter = {}) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/mcollective/client.rb', line 79

def createreq(msg, agent, filter={})
  if msg.is_a?(Message)
    request = msg
    agent = request.agent
  else
    ttl = @options[:ttl] || @config.ttl
    request = Message.new(msg, nil, :agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl)
    request.reply_to = @options[:reply_to] if @options[:reply_to]
  end

  @@request_sequence += 1 # rubocop:disable Style/ClassVars

  request.encode!
  subscribe(agent, :reply) unless request.reply_to
  request
end

#disconnectObject

Disconnects cleanly from the middleware



66
67
68
69
# File 'lib/mcollective/client.rb', line 66

def disconnect
  Log.debug("Disconnecting from the middleware")
  @connection.disconnect
end

#discover(filter, timeout, limit = 0) ⇒ Object

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts



152
153
154
# File 'lib/mcollective/client.rb', line 152

def discover(filter, timeout, limit=0)
  @discoverer.discover(filter.merge("collective" => collective), timeout, limit, self)
end

#discovered_req(body, agent, options = false) ⇒ Object



303
304
305
# File 'lib/mcollective/client.rb', line 303

def discovered_req(body, agent, options=false)
  raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
end

#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object

Prints out the stats returns from req and discovered_req in a nice way



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/mcollective/client.rb', line 308

def display_stats(stats, options=false, caption="stomp call summary")
  options ||= @options

  if options[:verbose]
    puts("\n---- #{caption} ----")

    if stats[:discovered]
      puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
    else
      puts("           Nodes: #{stats[:responses]}")
    end

    printf("      Start Time: %s\n", Time.at(stats[:starttime]))
    printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
    printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
    printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

  elsif stats[:discovered]
    printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
  else
    printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
  end

  unless stats[:noresponsefrom].empty?
    puts("\nNo response from:\n")

    stats[:noresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end

  unless stats[:unexpectedresponsefrom].empty?
    puts("\nUnexpected response from:\n")

    stats[:unexpectedresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end
end

#publish(request) ⇒ Object



236
237
238
239
# File 'lib/mcollective/client.rb', line 236

def publish(request)
  Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'")
  request.publish
end

#receive(requestid = nil) ⇒ Object

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/mcollective/client.rb', line 122

def receive(requestid=nil)
  reply = nil

  begin
    reply = @connection.receive
    reply.type = :reply
    reply.expected_msgid = requestid

    reply.decode!

    raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}") unless reply.requestid == requestid

    Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}")
  rescue SecurityValidationFailed => e
    Log.warn("Ignoring a message that did not pass security validations")
    retry
  rescue MsgDoesNotMatchRequestID => e
    Log.debug("Ignoring a message for some other client : #{e.message}")
    retry
  end

  reply
end

#req(body, agent = nil, options = false, waitfor = [], &block) ⇒ Object

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/mcollective/client.rb', line 164

def req(body, agent=nil, options=false, waitfor=[], &block)
  if body.is_a?(Message)
    agent = body.agent
    waitfor = body.discovered_hosts || []
    @options = body.options
  end

  @options = options if options
  threaded = @options[:threaded]
  timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
  request = createreq(body, agent, @options[:filter])
  publish_timeout = @options[:publish_timeout] || @config.publish_timeout
  stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
  $stdout.sync = true
  hosts_responded = 0

  begin
    if threaded
      hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
    else
      hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
    end
  rescue Interrupt # rubocop:disable Lint/SuppressedException
  ensure
    unsubscribe(agent, :reply)
  end

  update_stat(stat, hosts_responded, request.requestid)
end

#sendreq(msg, agent, filter = {}) ⇒ Object

Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses



73
74
75
76
77
# File 'lib/mcollective/client.rb', line 73

def sendreq(msg, agent, filter={})
  request = createreq(msg, agent, filter)
  publish(request)
  request.requestid
end

#start_publisher(request, publish_timeout) ⇒ Object

Starts the request publishing routine



225
226
227
228
229
230
231
232
233
234
# File 'lib/mcollective/client.rb', line 225

def start_publisher(request, publish_timeout)
  Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
  begin
    Timeout.timeout(publish_timeout) do
      publish(request)
    end
  rescue Timeout::Error
    Log.warn("Could not publish all messages. Publishing timed out.")
  end
end

#start_receiver(requestid, waitfor, timeout, &block) ⇒ Object

Starts the response receiver routine Expected to return the amount of received responses.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/mcollective/client.rb', line 243

def start_receiver(requestid, waitfor, timeout, &block)
  Log.debug("Starting response receiver with timeout of #{timeout}")
  hosts_responded = 0

  if waitfor.is_a?(Array)
    unfinished = Hash.new(0)
    waitfor.each {|w| unfinished[w] += 1}
  else
    unfinished = []
  end

  begin
    Timeout.timeout(timeout) do
      loop do
        resp = receive(requestid)

        if block.arity == 2
          yield resp.payload, resp
        else
          yield resp.payload
        end

        hosts_responded += 1

        if waitfor.is_a?(Array)
          sender = resp.payload[:senderid]
          if unfinished[sender] <= 1
            unfinished.delete(sender)
          else
            unfinished[sender] -= 1
          end

          break if !waitfor.empty? && unfinished.empty?
        else
          break unless waitfor == 0 || hosts_responded < waitfor
        end
      end
    end
  rescue Timeout::Error
    if waitfor.is_a?(Array)
      Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}") unless unfinished.empty?
    elsif waitfor > hosts_responded
      Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
    end
  end

  hosts_responded
end

#subscribe(agent, type) ⇒ Object



96
97
98
99
100
101
102
103
104
# File 'lib/mcollective/client.rb', line 96

def subscribe(agent, type)
  unless @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Subscribing to #{type} target for agent #{agent}")

    Util.subscribe(subscription)
    @subscriptions[agent] = 1
  end
end

#threaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/mcollective/client.rb', line 204

def threaded_req(request, publish_timeout, timeout, waitfor, &block)
  Log.debug("Starting threaded client")
  Thread.new do
    start_publisher(request, publish_timeout)
  end

  # When the client is threaded we add the publishing timeout to
  # the agent timeout so that the receiver doesn't time out before
  # publishing has finished in cases where publish_timeout >= timeout.
  total_timeout = publish_timeout + timeout
  hosts_responded = 0

  receiver = Thread.new do
    hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
  end

  receiver.join
  hosts_responded
end

#unsubscribe(agent, type) ⇒ Object



106
107
108
109
110
111
112
113
114
# File 'lib/mcollective/client.rb', line 106

def unsubscribe(agent, type)
  if @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Unsubscribing #{type} target for #{agent}")

    Util.unsubscribe(subscription)
    @subscriptions.delete(agent)
  end
end

#unthreaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher unthreaded. This is the default client behaviour.



196
197
198
199
# File 'lib/mcollective/client.rb', line 196

def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
  start_publisher(request, publish_timeout)
  start_receiver(request.requestid, waitfor, timeout, &block)
end

#update_stat(stat, hosts_responded, requestid) ⇒ Object



292
293
294
295
296
297
298
299
300
301
# File 'lib/mcollective/client.rb', line 292

def update_stat(stat, hosts_responded, requestid)
  stat[:totaltime] = Time.now.to_f - stat[:starttime]
  stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
  stat[:responses] = hosts_responded
  stat[:noresponsefrom] = []
  stat[:unexpectedresponsefrom] = []
  stat[:requestid] = requestid

  @stats = stat
end