Class: PortAuthority::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/port-authority/agent.rb

Overview

Scaffolding class for agents

Direct Known Subclasses

PortAuthority::Agents::LBaaS

Instance Method Summary collapse

Constructor Details

#initializeAgent

Common agent process init. Contains configuration load, common signal responses and runtime variables init. Implements execution of actual agents via run method. Also handles any uncaught exceptions.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/port-authority/agent.rb', line 16

def initialize
  Thread.current[:name] = 'main'                        # name main thread
  @@_exit = false                                       # prepare exit flag
  @@_semaphores = { log: Mutex.new }                    # init semaphores
  @@_threads = {}                                       # init threads
  Signal.trap('INT') { exit!(1) }                       # end immediatelly
  Signal.trap('TERM') { end! }                          # end gracefully
  Config.load! || exit!(1)                              # load config or die
  begin                                                 # all-wrapping exception ;)
    run                                                 # hook to child class
  rescue StandardError => e
    Logger.alert "UNCAUGHT EXCEPTION IN THREAD main! Dying!  X.X"
    Logger.alert [' ', "#{e.class}:", e.message].join(' ')
    e.backtrace.each {|line| Logger.debug "  #{line}"}
    exit! 1
  end
end

Instance Method Details

#end!Object

Raise the exit flag



81
82
83
# File 'lib/port-authority/agent.rb', line 81

def end!
  @@_exit = true
end

#exit?Boolean

Has the exit flag been raised?

Returns:

  • (Boolean)


76
77
78
# File 'lib/port-authority/agent.rb', line 76

def exit?
  @@_exit
end

#hostnameObject

Return hostname.



149
150
151
# File 'lib/port-authority/agent.rb', line 149

def hostname
  @hostname ||= Socket.gethostname
end

#sem_create(name) ⇒ Object

Create a named Mutex semaphore



86
87
88
# File 'lib/port-authority/agent.rb', line 86

def sem_create(name)
  @@_semaphores.merge!(Hash[name.to_sym], Mutex.new)
end

#setup(args = {}) ⇒ Object

Setup the agent process. Initializes logging, system process parameters, daemonizing.

There are 4 optional parameters:

:name

String Agent name. Defaults to self.class.downcase of the child agent

:root

Bool Require to be ran as root. Defaults to false.

:daemonize

Bool Daemonize the process. Defaults to false.

:nice

Int nice of the process. Defaults to 0



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/port-authority/agent.rb', line 43

def setup(args = {})
  name = args[:name] || self.class.to_s.downcase.split('::').last
  args[:root] ||= false
  args[:daemonize] ||= false
  args[:nice] ||= 0
  Logger.init! @@_semaphores[:log]
  Logger.info 'Starting main thread'
  Logger.debug 'Setting process name'
  if RUBY_VERSION >= '2.1'
    Process.setproctitle("pa-#{name}-agent")
  else
    $0 = "pa-#{name}-agent"
  end
  if args[:root] && Process.uid != 0
    Logger.alert 'Must run under root user!'
    exit! 1
  end
  Logger.debug 'Setting CPU nice level'
  Process.setpriority(Process::PRIO_PROCESS, 0, args[:nice])
  if args[:daemonize]
    Logger.info 'Daemonizing process'
    if RUBY_VERSION < '1.9'
      exit if fork
      Process.setsid
      exit if fork
      Dir.chdir('/')
    else
      Process.daemon
    end
  end
end

#thr_create(name, interval, &block) ⇒ Object

Create a named Thread with its Mutex semaphore. The definition includes &block of code that should run within the thread.

The method requires 3 parameters:

name

Symbol Thread/Mutex name.

interval

Integer Thread loop interval.

&block

Proc Block of code to run.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/port-authority/agent.rb', line 98

def thr_create(name, interval, &block)
  @@_semaphores.merge!(Hash[name.to_sym, Mutex.new])
  @@_threads.merge!(Hash[name.to_sym, Thread.new do
      Thread.current[:name] = name.to_s
      Logger.info "Starting thread #{Thread.current[:name]}"
      begin
        until exit?
          yield block
          sleep interval
        end
        Logger.info "Ending thread #{Thread.current[:name]}"
      rescue StandardError => e
        Logger.alert "UNCAUGHT EXCEPTION IN THREAD #{Thread.current[:name]}"
        Logger.alert [' ', "#{e.class}:", e.message].join(' ')
        e.backtrace.each {|line| Logger.debug "  #{line}"}
        end!
      end
    end
  ])
end

#thr_safe(name = Thread.current[:name].to_sym, &block) ⇒ Object

Run thread-safe code. The name parameter can be omitted when used from within a block of thread code. In this case the Mutex with the same :name will be used.

The method accepts following parameters:

name

Symbol Mutex name.

&block

Proc Block of code to run.



127
128
129
130
131
# File 'lib/port-authority/agent.rb', line 127

def thr_safe(name=Thread.current[:name].to_sym, &block)
  @@_semaphores[name.to_sym].synchronize do
    yield block
  end
end

#thr_start(name = nil) ⇒ Object

Start named thread. If the name is omitted, applies to all spawned threads ;)



135
136
137
138
# File 'lib/port-authority/agent.rb', line 135

def thr_start(name=nil)
  return @@_threads[name].run if name
  @@_threads.each_value(&:run)
end

#thr_wait(name = nil) ⇒ Object

Wait for named thread to finish. If the name is omitted, applies to all spawned threads ;)



143
144
145
146
# File 'lib/port-authority/agent.rb', line 143

def thr_wait(name=nil)
  return @@_threads[name].join if name
  @@_threads.each_value(&:join)
end