Class: Blender::Driver::Serf
- Inherits:
-
Base
- Object
- Base
- Blender::Driver::Serf
- Defined in:
- lib/blender/drivers/serf.rb
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#filter_by ⇒ Object
readonly
Returns the value of attribute filter_by.
-
#filter_tag ⇒ Object
readonly
Returns the value of attribute filter_tag.
Instance Method Summary collapse
- #execute(tasks, hosts) ⇒ Object
- #exit_status(responses, nodes) ⇒ Object
-
#initialize(config = {}) ⇒ Serf
constructor
A new instance of Serf.
- #query_opts(command, nodes) ⇒ Object
- #run_command(command, nodes) ⇒ Object
- #serf_query(command, host) ⇒ Object
Constructor Details
#initialize(config = {}) ⇒ Serf
Returns a new instance of Serf.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/blender/drivers/serf.rb', line 29 def initialize(config = {}) cfg = config.dup @filter_by = cfg.delete(:filter_by) || :host @concurrency = cfg.delete(:concurrency) || 1 if @filter_by == :tag @filter_tag = cfg.delete(:filter_tag) raise ArgumentError, 'Must specify filter_tag when filter_by is set to :tag' unless @filter_tag end super(cfg) end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
27 28 29 |
# File 'lib/blender/drivers/serf.rb', line 27 def concurrency @concurrency end |
#filter_by ⇒ Object (readonly)
Returns the value of attribute filter_by.
27 28 29 |
# File 'lib/blender/drivers/serf.rb', line 27 def filter_by @filter_by end |
#filter_tag ⇒ Object (readonly)
Returns the value of attribute filter_tag.
27 28 29 |
# File 'lib/blender/drivers/serf.rb', line 27 def filter_tag @filter_tag end |
Instance Method Details
#execute(tasks, hosts) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/blender/drivers/serf.rb', line 96 def execute(tasks, hosts) Log.debug("Serf query on #{filter_by}s [#{hosts.inspect}]") tasks.each do |task| hosts.each_slice(concurrency) do |nodes| cmd = run_command(task.command, nodes) if cmd.exitstatus != 0 and !task.[:ignore_failure] raise ExecutionFailed, cmd.stderr end end end end |
#exit_status(responses, nodes) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/blender/drivers/serf.rb', line 65 def exit_status(responses, nodes) case filter_by when :host if responses.size == nodes.size ExecOutput.new(0, responses.inspect, '') else ExecOutput.new(-1, '', "Insufficient number of responses. Expected:#{nodes.size}, Got:#{responses.size}") end when :tag, :none ExecOutput.new(0, responses.inspect, '') else raise ArgumentError, "Unknown filter_by option: #{filter_by}" end end |
#query_opts(command, nodes) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/blender/drivers/serf.rb', line 80 def query_opts(command, nodes) opts = { Timeout: (command.timeout || 15)*1e9.to_i} case filter_by when :host opts.merge!(FilterNodes: nodes) when :tag raise 'filter by :tag only supports single tag' unless nodes.size == 1 opts.merge!(FilterTags: {filter_tag => nodes.first}) when :none raise 'filter by :none only supported with localhost' unless nodes == ['localhost'] else raise ArgumentError, "Unknown filter_by option: #{filter_by}" end [ command.query, command.payload, opts] end |
#run_command(command, nodes) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/blender/drivers/serf.rb', line 53 def run_command(command, nodes) begin responses = serf_query(command, nodes) if command.process command.process.call(responses) end exit_status(responses, nodes) rescue StandardError => e ExecOutput.new( -1, '', e.) end end |
#serf_query(command, host) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/blender/drivers/serf.rb', line 40 def serf_query(command, host) responses = [] Log.debug("Invoking serf query '#{command.query}' with payload '#{command.payload}' against #{@current_host}") Log.debug("Serf RPC address #{config[:host]}:#{config[:port]}") Serfx.connect(config) do |conn| conn.query(*query_opts(command, host)) do |event| responses << event stdout.puts event.inspect end end responses end |