Class: Patriot::Worker::Base Abstract

Inherits:
Object
  • Object
show all
Includes:
JobStore::Factory, Util::Logger, Util::Retry
Defined in:
lib/patriot/worker/base.rb

Overview

This class is abstract.

base class for worker implementations

Direct Known Subclasses

JobStoreServer, MultiNodeWorker

Constant Summary

Constants included from Util::Config

Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from JobStore::Factory

create_jobstore

Methods included from Util::Retry

execute_with_retry

Methods included from Util::Logger

#create_logger

Methods included from Util::Config

#load_config, #load_plugins

Constructor Details

#initialize(config) ⇒ Base



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/patriot/worker/base.rb', line 40

def initialize(config)
  raise "configuration is nil" if config.nil?
  @logger      = create_logger(config)
  @config      = config
  @job_store   = create_jobstore(Patriot::JobStore::ROOT_STORE_ID, @config)
  @host        = `hostname`.chomp
  @cycle       = config.get('fetch_cycle', Patriot::Worker::DEFAULT_FETCH_CYCLE).to_i
  @fetch_limit = config.get('fetch_limit', Patriot::Worker::DEFAULT_FETCH_LIMIT).to_i
  @worker_name = config.get('worker_name', Patriot::Worker::DEFAULT_WORKER_NAME)
  @info_server = Patriot::Worker::InfoServer.new(self,@config)
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def config
  @config
end

#cycleObject

Returns the value of attribute cycle.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def cycle
  @cycle
end

#hostObject

Returns the value of attribute host.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def host
  @host
end

#job_storeObject

Returns the value of attribute job_store.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def job_store
  @job_store
end

#started_atObject (readonly)

Returns the value of attribute started_at.



37
38
39
# File 'lib/patriot/worker/base.rb', line 37

def started_at
  @started_at
end

#statusObject

Returns the value of attribute status.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def status
  @status
end

Instance Method Details

#execute_job(job_ticket) ⇒ Patriot::Command::ExitCode

execute a job



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/patriot/worker/base.rb', line 55

def execute_job(job_ticket)
  job_ticket.exec_host   = @host
  job_ticket.exec_node   = Thread.current[:name]
  begin
    response = @job_store.offer_to_execute(job_ticket)
  rescue Exception => e
    @logger.error e
    return Patriot::Command::ExitCode::FAILED
  end

  # already executed by other node
  return Patriot::Command::ExitCode::SKIPPED if response.nil?

  @logger.info " executing job: #{job_ticket.job_id}"
  command                 = response[:command]
  job_ticket.execution_id = response[:execution_id]
  job_ticket.exit_code    = Patriot::Command::ExitCode::FAILED
  begin
    command.execute
    job_ticket.exit_code  = Patriot::Command::ExitCode::SUCCEEDED
  rescue Exception => e
    @logger.warn " job : #{job_ticket.job_id} failed"
    @logger.warn e
    job_ticket.description = e.to_s
  else
    job_ticket.description = command.description
  ensure
    begin
      execute_with_retry{ @job_store.report_completion_status(job_ticket) }
    rescue Exception => job_store_error
      @logger.error job_store_error
    end
    unless command.post_processors.nil?
      continue_post_processing = true
      command.post_processors.each do |pp|
        begin
          if continue_post_processing
            @logger.info "executing post process by #{pp}"
            continue_post_processing = continue_post_processing && pp.process(command, self, job_ticket)
          else
            @logger.info "skipping post process by #{pp}"
          end
        rescue Exception => post_process_error
          @logger.error "post process by #{pp} failed"
          @logger.error post_process_error
        end
      end
    end
  end
  return job_ticket.exit_code
end

#get_pidInteger



108
109
110
# File 'lib/patriot/worker/base.rb', line 108

def get_pid
  return Patriot::Worker.get_pid(@config)
end

#init_workerObject

should be overrided in sub class This method is for implementation-specific configuration

Raises:

  • (NotImplementedError)


147
148
149
# File 'lib/patriot/worker/base.rb', line 147

def init_worker
  raise NotImplementedError
end

#request_shutdownBoolean

send a request graceful shutdown to a running worker



114
115
116
117
118
119
120
121
122
# File 'lib/patriot/worker/base.rb', line 114

def request_shutdown
  pid = get_pid
  if pid.nil?
    @logger.info("worker #{@worker_name} does not exist")
    return false
  end
  Process.kill(SIGNAL_FOR_GRACEFUL_SHUTDOWN[0], pid.to_i)
  return true
end

#run_workerObject

should be overrided in sub class Main loop in which the worker fetches and executes jobs should be implemented here

Raises:

  • (NotImplementedError)


153
154
155
# File 'lib/patriot/worker/base.rb', line 153

def run_worker
  raise NotImplementedError
end

#start_workerObject

main entry point of worker processing



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/patriot/worker/base.rb', line 125

def start_worker
  return unless get_pid.nil?
  @logger.info "starting worker #{@node}@#{@host}"
  pid_file = Patriot::Worker.get_pid_file(@config)
  File.open(pid_file, 'w') {|f| f.write($$)} # save pid for shutdown
  set_traps
  @info_server.start_server
  @started_at = Time.now
  @logger.info "initiating worker #{@node}@#{@host}"
  init_worker
  @status = Patriot::Worker::Status::ACTIVE
  @logger.info "start worker #{@node}@#{@host}"
  run_worker
  @logger.info "shutting down worker #{@node}@#{@host}"
  stop_worker
  # should be last since worker_admin judge availability from the info_server
  @info_server.shutdown_server
end

#stop_workerObject

should be overrided in sub class Tasks for tearing down the worker should be implemented here

Raises:

  • (NotImplementedError)


159
160
161
# File 'lib/patriot/worker/base.rb', line 159

def stop_worker
  raise NotImplementedError
end