Class: Blender::Driver::Serf

Inherits:
Base
  • Object
show all
Defined in:
lib/blender/drivers/serf.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ Serf

Returns a new instance of Serf.



29
30
31
32
33
34
35
36
37
38
# File 'lib/blender/drivers/serf.rb', line 29

def initialize(config = {})
  cfg = config.dup
  @filter_by =  cfg.delete(:filter_by) || :host
  @concurrency = cfg.delete(:concurrency) || 1
  if @filter_by == :tag
    @filter_tag =  cfg.delete(:filter_tag)
    raise ArgumentError, 'Must specify filter_tag when filter_by is set to :tag' unless @filter_tag
  end
  super(cfg)
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



27
28
29
# File 'lib/blender/drivers/serf.rb', line 27

def concurrency
  @concurrency
end

#filter_byObject (readonly)

Returns the value of attribute filter_by.



27
28
29
# File 'lib/blender/drivers/serf.rb', line 27

def filter_by
  @filter_by
end

#filter_tagObject (readonly)

Returns the value of attribute filter_tag.



27
28
29
# File 'lib/blender/drivers/serf.rb', line 27

def filter_tag
  @filter_tag
end

Instance Method Details

#execute(tasks, hosts) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
# File 'lib/blender/drivers/serf.rb', line 96

def execute(tasks, hosts)
  Log.debug("Serf query on #{filter_by}s [#{hosts.inspect}]")
  tasks.each do |task|
    hosts.each_slice(concurrency) do |nodes|
      cmd = run_command(task.command, nodes)
      if cmd.exitstatus != 0 and !task.[:ignore_failure]
        raise ExecutionFailed, cmd.stderr
      end
    end
  end
end

#exit_status(responses, nodes) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/blender/drivers/serf.rb', line 65

def exit_status(responses, nodes)
  case filter_by
  when :host
    if responses.size == nodes.size
      ExecOutput.new(0, responses.inspect, '')
    else
      ExecOutput.new(-1, '', "Insufficient number of responses. Expected:#{nodes.size}, Got:#{responses.size}")
    end
  when :tag, :none
    ExecOutput.new(0, responses.inspect, '')
  else
    raise ArgumentError, "Unknown filter_by option: #{filter_by}"
  end
end

#query_opts(command, nodes) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/blender/drivers/serf.rb', line 80

def query_opts(command, nodes)
  opts = { Timeout: (command.timeout || 15)*1e9.to_i}
  case filter_by
  when :host
    opts.merge!(FilterNodes: nodes)
  when :tag
    raise 'filter by :tag only supports single tag' unless nodes.size == 1
    opts.merge!(FilterTags: {filter_tag => nodes.first})
  when :none
    raise 'filter by :none only supported with localhost' unless nodes == ['localhost']
  else
    raise ArgumentError, "Unknown filter_by option: #{filter_by}"
  end
  [ command.query, command.payload, opts]
end

#run_command(command, nodes) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/blender/drivers/serf.rb', line 53

def run_command(command, nodes)
  begin
    responses = serf_query(command, nodes)
    if command.process
      command.process.call(responses)
    end
    exit_status(responses, nodes)
  rescue StandardError => e
    ExecOutput.new( -1, '', e.message)
  end
end

#serf_query(command, host) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/blender/drivers/serf.rb', line 40

def serf_query(command, host)
  responses = []
  Log.debug("Invoking serf query '#{command.query}' with payload '#{command.payload}' against #{@current_host}")
  Log.debug("Serf RPC address #{config[:host]}:#{config[:port]}")
  Serfx.connect(config) do |conn|
    conn.query(*query_opts(command, host)) do |event|
      responses <<  event
      stdout.puts event.inspect
    end
  end
  responses
end