Class: PortAuthority::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/port-authority/agent.rb

Overview

Scaffolding class for agents

Direct Known Subclasses

PortAuthority::Agents::LBaaS

Instance Method Summary collapse

Constructor Details

#initializeAgent

Common agent process init. Contains configuration load, common signal responses and runtime variables init. Implements execution of actual agents via run method. Also handles any uncaught exceptions.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/port-authority/agent.rb', line 17

def initialize
  Thread.current[:name] = 'main'                        # name main thread
  @@_exit = false                                       # prepare exit flag
  @@_semaphores = { log: Mutex.new }                    # init semaphores
  @@_threads = {}                                       # init threads
  Signal.trap('INT') { exit!(1) }                       # end immediatelly
  Signal.trap('TERM') { end! }                          # end gracefully
  Config.load! || exit!(1)                              # load config or die
  begin                                                 # all-wrapping exception ;)
    run                                                 # hook to child class
  rescue StandardError => e
    Logger.alert "UNCAUGHT EXCEPTION IN THREAD main! Dying!  X.X"
    Logger.alert [' ', "#{e.class}:", e.message].join(' ')
    e.backtrace.each {|line| Logger.debug "  #{line}"}
    exit! 1
  end
end

Instance Method Details

#end!Object

Raise the exit flag



82
83
84
# File 'lib/port-authority/agent.rb', line 82

def end!
  @@_exit = true
end

#exit?Boolean

Has the exit flag been raised?

Returns:

  • (Boolean)


77
78
79
# File 'lib/port-authority/agent.rb', line 77

def exit?
  @@_exit
end

#hostnameObject

Return hostname.



159
160
161
# File 'lib/port-authority/agent.rb', line 159

def hostname
  @hostname ||= Socket.gethostname
end

#sem_create(name) ⇒ Object

Create a named Mutex semaphore



87
88
89
# File 'lib/port-authority/agent.rb', line 87

def sem_create(name)
  @@_semaphores.merge!(Hash[name.to_sym], Mutex.new)
end

#setup(args = {}) ⇒ Object

Setup the agent process. Initializes logging, system process parameters, daemonizing.

There are 4 optional parameters:

:name

String Agent name. Defaults to self.class.downcase of the child agent

:root

Bool Require to be ran as root. Defaults to false.

:daemonize

Bool Daemonize the process. Defaults to false.

:nice

Int nice of the process. Defaults to 0



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/port-authority/agent.rb', line 44

def setup(args = {})
  name = args[:name] || self.class.to_s.downcase.split('::').last
  args[:root] ||= false
  args[:daemonize] ||= false
  args[:nice] ||= 0
  Logger.init! @@_semaphores[:log]
  Logger.info 'Starting main thread'
  Logger.debug 'Setting process name'
  if RUBY_VERSION >= '2.1'
    Process.setproctitle("pa-#{name}-agent")
  else
    $0 = "pa-#{name}-agent"
  end
  if args[:root] && Process.uid != 0
    Logger.alert 'Must run under root user!'
    exit! 1
  end
  Logger.debug 'Setting CPU nice level'
  Process.setpriority(Process::PRIO_PROCESS, 0, args[:nice])
  if args[:daemonize]
    Logger.info 'Daemonizing process'
    if RUBY_VERSION < '1.9'
      exit if fork
      Process.setsid
      exit if fork
      Dir.chdir('/')
    else
      Process.daemon
    end
  end
end

#shellcmd(*args) ⇒ Object

Run command in Bash.



150
151
152
153
154
155
156
# File 'lib/port-authority/agent.rb', line 150

def shellcmd(*args)
  cmd = args.join(' ').to_s
  cksum = Digest::SHA256.hexdigest(args.join.to_s)[0..15]
  Logger.debug "Executing shellcommand #{cksum} - #{cmd}"
  ret = system cmd
  Logger.debug "Shellcommand #{cksum} returned #{ret.to_s}"
end

#thr_create(name, interval, &block) ⇒ Object

Create a named Thread with its Mutex semaphore. The definition includes &block of code that should run within the thread.

The method requires 3 parameters:

name

Symbol Thread/Mutex name.

interval

Integer Thread loop interval.

&block

Proc Block of code to run.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/port-authority/agent.rb', line 99

def thr_create(name, interval, &block)
  @@_semaphores.merge!(Hash[name.to_sym, Mutex.new])
  @@_threads.merge!(Hash[name.to_sym, Thread.new do
      Thread.current[:name] = name.to_s
      Logger.info "Starting thread #{Thread.current[:name]}"
      begin
        until exit?
          yield block
          sleep interval
        end
        Logger.info "Ending thread #{Thread.current[:name]}"
      rescue StandardError => e
        Logger.alert "UNCAUGHT EXCEPTION IN THREAD #{Thread.current[:name]}"
        Logger.alert [' ', "#{e.class}:", e.message].join(' ')
        e.backtrace.each {|line| Logger.debug "  #{line}"}
        end!
      end
    end
  ])
end

#thr_safe(name = Thread.current[:name].to_sym, &block) ⇒ Object

Run thread-safe code. The name parameter can be omitted when used from within a block of thread code. In this case the Mutex with the same :name will be used.

The method accepts following parameters:

name

Symbol Mutex name.

&block

Proc Block of code to run.



128
129
130
131
132
# File 'lib/port-authority/agent.rb', line 128

def thr_safe(name=Thread.current[:name].to_sym, &block)
  @@_semaphores[name.to_sym].synchronize do
    yield block
  end
end

#thr_start(name = nil) ⇒ Object

Start named thread. If the name is omitted, applies to all spawned threads ;)



136
137
138
139
# File 'lib/port-authority/agent.rb', line 136

def thr_start(name=nil)
  return @@_threads[name].run if name
  @@_threads.each_value(&:run)
end

#thr_wait(name = nil) ⇒ Object

Wait for named thread to finish. If the name is omitted, applies to all spawned threads ;)



144
145
146
147
# File 'lib/port-authority/agent.rb', line 144

def thr_wait(name=nil)
  return @@_threads[name].join if name
  @@_threads.each_value(&:join)
end