Class: Pazuzu::Control::Protocol

Inherits:
Object
  • Object
show all
Defined in:
lib/pazuzu/control/protocol.rb

Constant Summary collapse

COMMAND_MAPPINGS =
[
  {
    :pattern => /^help$/,
    :command => :help,
    :usage => 'help',
    :description => 'Show commands.'
  },
  {
    :pattern => /^q(?:uit)?$/,
    :command => :quit,
    :usage => 'q, quit',
    :description => 'Quits session.'
  },
  {
    :pattern => /^list(?:\s+(?<spec>[^\s]+))?$/,
    :command => :list,
    :usage => 'list [SPEC]',
    :description => 'Lists applications whose name, worker or instance matches SPEC (eg., "myapp", "myapp.web*").'
  },
  {
    :pattern => /^log(?:\s+(?<spec>[^\s]+)(?:\s+(?<limit>.*))?)?$/,
    :command => :log,
    :usage => 'log [SPEC] [LIMIT]',
    :description => 'Lists log entries for application, workers or instances matching SPEC, at most LIMIT lines.'
  },
  {
    :pattern => /^start(?:\s+(?<spec>[^\s]+))?$/,
    :command => :start,
    :usage => 'start SPEC',
    :description => 'Starts an application, worker or instance by SPEC.'
  },
  {
    :pattern => /^restart(?:\s+(?<spec>[^\s]+))?$/,
    :command => :restart,
    :usage => 'restart SPEC',
    :description => 'Restarts an application, worker or instance by SPEC. Stopped matches will be started.'
  },
  {
    :pattern => /^stop(?:\s+(?<spec>[^\s]+))?$/,
    :command => :stop,
    :usage => 'stop SPEC',
    :description => 'Stops an application, worker or instance by SPEC.'
  },
  {
    :pattern => /^scale\s+(?<spec>[^\s]+)(?:\s+(?<value>(?:\+|-)?[^\s]+))?$/,
    :command => :scale,
    :usage => 'scale WORKER-SPEC VALUE',
    :description => 'Set the number of instances for a worker (or workers). Value may be an absolute number, or a relative (+1, -1), or the string "default".'
  }
].freeze

Instance Method Summary collapse

Constructor Details

#initialize(io, supervisor) ⇒ Protocol

Returns a new instance of Protocol.



57
58
59
60
61
62
63
# File 'lib/pazuzu/control/protocol.rb', line 57

def initialize(io, supervisor)
  @io = io
  @supervisor = supervisor
  @active = true
  @logger = Utility::AnnotatedLogger.new(
    @supervisor.logger, 'Control socket')
end

Instance Method Details

#collapse_hierarchy(objects) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/pazuzu/control/protocol.rb', line 261

def collapse_hierarchy(objects)
  return objects.dup.delete_if { |object|
    case object
      when Worker
        objects.include?(object.application)
      when Instance
        objects.include?(object.worker) || objects.include?(object.worker.application)
      else
        false
    end
  }
end

#error!(message) ⇒ Object



234
235
236
# File 'lib/pazuzu/control/protocol.rb', line 234

def error!(message)
  respond!(:error => message)
end

#handle_command(line) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pazuzu/control/protocol.rb', line 76

def handle_command(line)
  @logger.info "Received: #{line}"
  matched = false
  COMMAND_MAPPINGS.each do |command|
    match = command[:pattern].match(line)
    if match
      self.send("handle_#{command[:command]}_command", match)
      matched = true
      break
    end
  end
  unless matched
    error! "Don't understand '#{line}'"
  end
end

#handle_help_command(options) ⇒ Object



92
93
94
95
96
97
98
99
# File 'lib/pazuzu/control/protocol.rb', line 92

def handle_help_command(options)
  results! COMMAND_MAPPINGS.map { |command|
    {
      :syntax => command[:usage],
      :description => command[:description]
    }
  }
end

#handle_list_command(options) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/pazuzu/control/protocol.rb', line 101

def handle_list_command(options)
  spec = options[:spec]
  results = Set.new
  match_objects(spec) do |object|
    case object
      when Application
        results.add(object)
      when Worker
        results.add(object.application)
      when Instance
        results.add(object.worker.application)
    end
  end
  results! results.map { |object| serialize_object(object) }
end

#handle_log_command(options) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/pazuzu/control/protocol.rb', line 117

def handle_log_command(options)
  spec = options[:spec]
  limit = options[:limit].try(:to_i)
  matches = collapse_hierarchy(match_objects(spec))
  entries = matches.map { |object| object.log_entries }
  entries.flatten!(1)
  entries.sort_by! { |(source, time, message)| time }
  entries.slice!(0, entries.length - limit) if limit
  results! entries.map { |(source, time, message)|
    {:source => source, :time => time, :message => message}
  }
end

#handle_quit_command(options) ⇒ Object



218
219
220
221
# File 'lib/pazuzu/control/protocol.rb', line 218

def handle_quit_command(options)
  success! 'It was a pleasure.'
  @active = false
end

#handle_restart_command(options) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/pazuzu/control/protocol.rb', line 144

def handle_restart_command(options)
  spec = options[:spec]
  matches = collapse_hierarchy(match_objects(spec))
  matches |= match_objects(spec).select { |object| object.run_state != :running }
  matches.each do |object|
    object.stop!
  end
  if matches.any?
    begin
      timeout(30) do
        loop do
          break if matches.all? { |object| object.run_state == :stopped }
          sleep(1)
        end
      end
      matches.each do |object|
        object.start!
      end
      timeout(30) do
        loop do
          break if matches.all? { |object| object.run_state == :running }
          sleep(1)
        end
      end
    rescue Timeout::Error
      error! "Timed out."
    else
      success! "Restarted #{matches.map(&:qname).join(', ')}."
    end
  else
    error! "No matches."
  end
end

#handle_scale_command(options) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/pazuzu/control/protocol.rb', line 191

def handle_scale_command(options)
  spec, value = options[:spec], options[:value]
  if value
    matches = []
    match_objects(spec) do |object|
      if object.is_a?(Worker)
        case value
          when /^(\+|-)/
            object.add_instance_count!(value.to_i)
          when /^default$/i
            object.revert_dynamic_instance_count!
          else
            object.set_instance_count!(value.to_i)
        end
        matches << object
      end
    end
    if matches.any?
      success! "Adjusted #{matches.map(&:qname).join(', ')}."
    else
      error! "No matches."
    end
  else
    error! "No value specified."
  end
end

#handle_start_command(options) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/pazuzu/control/protocol.rb', line 130

def handle_start_command(options)
  spec = options[:spec]
  matches = collapse_hierarchy(match_objects(spec))
  matches |= match_objects(spec).select { |object| object.run_state != :running }
  matches.each do |object|
    object.start!
  end
  if matches.any?
    success! "Started #{matches.map(&:qname).join(', ')}."
  else
    error! "No matches."
  end
end

#handle_stop_command(options) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/pazuzu/control/protocol.rb', line 178

def handle_stop_command(options)
  spec = options[:spec]
  matches = collapse_hierarchy(match_objects(spec))
  matches.each do |object|
    object.stop!
  end
  if matches.any?
    success! "Stopped #{matches.map(&:qname).join(', ')}."
  else
    error! "No matches."
  end
end

#match_name?(name, spec) ⇒ Boolean

Returns:

  • (Boolean)


297
298
299
# File 'lib/pazuzu/control/protocol.rb', line 297

def match_name?(name, spec)
  spec.blank? || name == spec || File.fnmatch(spec, name)
end

#match_objects(spec, &block) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/pazuzu/control/protocol.rb', line 274

def match_objects(spec, &block)
  results = Set.new
  @supervisor.applications.each do |application|
    application.workers.each do |worker|
      worker.instances.each do |instance|
        if match_name?(instance.qname, spec)
          results << instance
          yield instance if block
        end
      end
      if match_name?(worker.qname, spec)
        results << worker
        yield worker if block
      end
    end
    if match_name?(application.qname, spec)
      results << application
      yield application if block
    end
  end
  results.to_a
end

#respond!(data) ⇒ Object



223
224
225
226
227
228
# File 'lib/pazuzu/control/protocol.rb', line 223

def respond!(data)
  output = JSON.dump(data, :pretty => true)
  @logger.debug "Sending: #{output}"
  @io.puts('%010x' % output.length)
  @io.puts(output)
end

#results!(results) ⇒ Object



230
231
232
# File 'lib/pazuzu/control/protocol.rb', line 230

def results!(results)
  respond!(:results => results)
end

#run!Object



65
66
67
68
69
70
71
72
73
74
# File 'lib/pazuzu/control/protocol.rb', line 65

def run!
  while @active
    line = @io.gets
    break unless line
    line.strip!
    unless line.blank?
      handle_command(line)
    end
  end
end

#serialize_object(object) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/pazuzu/control/protocol.rb', line 242

def serialize_object(object)
  data = {}
  data[:name] = object.qname
  data[:command] = object.command_line if object.respond_to?(:command_line)
  data[:state] = object.run_state
  data[:started_at] = object.started_at.iso8601 if object.started_at
  data[:stopped_at] = object.stopped_at.iso8601 if object.stopped_at
  data[:uptime] = Time.now - object.started_at if object.started_at
  case object
    when Application
      data[:workers] = object.workers.map { |worker| serialize_object(worker) }
    when Worker
      data[:instances] = object.instances.map { |instance| serialize_object(instance) }
    when Instance
      data[:recovery_count] = object.recovery_count
  end
  data
end

#success!(message) ⇒ Object



238
239
240
# File 'lib/pazuzu/control/protocol.rb', line 238

def success!(message)
  respond!(:success => message)
end