Class: PortAuthority::Agent
- Inherits:
-
Object
- Object
- PortAuthority::Agent
- Defined in:
- lib/port-authority/agent.rb
Overview
Scaffolding class for agents
Direct Known Subclasses
Instance Method Summary collapse
-
#end! ⇒ Object
Raise the exit flag.
-
#exit? ⇒ Boolean
Has the exit flag been raised?.
-
#hostname ⇒ Object
Return hostname.
-
#initialize ⇒ Agent
constructor
Common agent process init.
-
#sem_create(name) ⇒ Object
Create a named Mutex semaphore.
-
#setup(args = {}) ⇒ Object
Setup the agent process.
-
#thr_create(name, interval, &block) ⇒ Object
Create a named Thread with its Mutex semaphore.
-
#thr_safe(name = Thread.current[:name].to_sym, &block) ⇒ Object
Run thread-safe code.
-
#thr_start(name = nil) ⇒ Object
Start named thread.
-
#thr_wait(name = nil) ⇒ Object
Wait for named thread to finish.
Constructor Details
#initialize ⇒ Agent
Common agent process init. Contains configuration load, common signal responses and runtime variables init. Implements execution of actual agents via run method. Also handles any uncaught exceptions.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/port-authority/agent.rb', line 16 def initialize Thread.current[:name] = 'main' # name main thread @@_exit = false # prepare exit flag @@_semaphores = { log: Mutex.new } # init semaphores @@_threads = {} # init threads Signal.trap('INT') { exit!(1) } # end immediatelly Signal.trap('TERM') { end! } # end gracefully Config.load! || exit!(1) # load config or die begin # all-wrapping exception ;) run # hook to child class rescue StandardError => e Logger.alert "UNCAUGHT EXCEPTION IN THREAD main! Dying! X.X" Logger.alert [' ', "#{e.class}:", e.].join(' ') e.backtrace.each {|line| Logger.debug " #{line}"} exit! 1 end end |
Instance Method Details
#end! ⇒ Object
Raise the exit flag
81 82 83 |
# File 'lib/port-authority/agent.rb', line 81 def end! @@_exit = true end |
#exit? ⇒ Boolean
Has the exit flag been raised?
76 77 78 |
# File 'lib/port-authority/agent.rb', line 76 def exit? @@_exit end |
#hostname ⇒ Object
Return hostname.
149 150 151 |
# File 'lib/port-authority/agent.rb', line 149 def hostname @hostname ||= Socket.gethostname end |
#sem_create(name) ⇒ Object
Create a named Mutex semaphore
86 87 88 |
# File 'lib/port-authority/agent.rb', line 86 def sem_create(name) @@_semaphores.merge!(Hash[name.to_sym], Mutex.new) end |
#setup(args = {}) ⇒ Object
Setup the agent process. Initializes logging, system process parameters, daemonizing.
There are 4 optional parameters:
:name-
String Agent name. Defaults to self.class.downcase of the child agent
:root-
Bool Require to be ran as root. Defaults to false.
:daemonize-
Bool Daemonize the process. Defaults to false.
:nice-
Int nice of the process. Defaults to 0
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/port-authority/agent.rb', line 43 def setup(args = {}) name = args[:name] || self.class.to_s.downcase.split('::').last args[:root] ||= false args[:daemonize] ||= false args[:nice] ||= 0 Logger.init! @@_semaphores[:log] Logger.info 'Starting main thread' Logger.debug 'Setting process name' if RUBY_VERSION >= '2.1' Process.setproctitle("pa-#{name}-agent") else $0 = "pa-#{name}-agent" end if args[:root] && Process.uid != 0 Logger.alert 'Must run under root user!' exit! 1 end Logger.debug 'Setting CPU nice level' Process.setpriority(Process::PRIO_PROCESS, 0, args[:nice]) if args[:daemonize] Logger.info 'Daemonizing process' if RUBY_VERSION < '1.9' exit if fork Process.setsid exit if fork Dir.chdir('/') else Process.daemon end end end |
#thr_create(name, interval, &block) ⇒ Object
Create a named Thread with its Mutex semaphore. The definition includes &block of code that should run within the thread.
The method requires 3 parameters:
name-
Symbol Thread/Mutex name.
interval-
Integer Thread loop interval.
- &block
-
Proc Block of code to run.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/port-authority/agent.rb', line 98 def thr_create(name, interval, &block) @@_semaphores.merge!(Hash[name.to_sym, Mutex.new]) @@_threads.merge!(Hash[name.to_sym, Thread.new do Thread.current[:name] = name.to_s Logger.info "Starting thread #{Thread.current[:name]}" begin until exit? yield block sleep interval end Logger.info "Ending thread #{Thread.current[:name]}" rescue StandardError => e Logger.alert "UNCAUGHT EXCEPTION IN THREAD #{Thread.current[:name]}" Logger.alert [' ', "#{e.class}:", e.].join(' ') e.backtrace.each {|line| Logger.debug " #{line}"} end! end end ]) end |
#thr_safe(name = Thread.current[:name].to_sym, &block) ⇒ Object
Run thread-safe code. The name parameter can be omitted when used from within a block of thread code. In this case the Mutex with the same :name will be used.
The method accepts following parameters:
name-
Symbol Mutex name.
- &block
-
Proc Block of code to run.
127 128 129 130 131 |
# File 'lib/port-authority/agent.rb', line 127 def thr_safe(name=Thread.current[:name].to_sym, &block) @@_semaphores[name.to_sym].synchronize do yield block end end |
#thr_start(name = nil) ⇒ Object
Start named thread. If the name is omitted, applies to all spawned threads ;)
135 136 137 138 |
# File 'lib/port-authority/agent.rb', line 135 def thr_start(name=nil) return @@_threads[name].run if name @@_threads.each_value(&:run) end |
#thr_wait(name = nil) ⇒ Object
Wait for named thread to finish. If the name is omitted, applies to all spawned threads ;)
143 144 145 146 |
# File 'lib/port-authority/agent.rb', line 143 def thr_wait(name=nil) return @@_threads[name].join if name @@_threads.each_value(&:join) end |