Class: Bricolage::StreamingLoad::TaskHandler

Inherits:
Bricolage::SQSDataSource::MessageHandler show all
Defined in:
lib/bricolage/streamingload/taskhandler.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Bricolage::SQSDataSource::MessageHandler

#after_message_batch, #call_handler_method, #handle, #handleable?, #handler_method

Constructor Details

#initialize(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job) ⇒ TaskHandler

Returns a new instance of TaskHandler.



100
101
102
103
104
105
106
107
108
109
# File 'lib/bricolage/streamingload/taskhandler.rb', line 100

def initialize(context:, ctl_ds:, data_ds:, log_table:, task_queue:, working_dir:, logger:, job_class: Job)
  @ctx = context
  @ctl_ds = ctl_ds
  @data_ds = data_ds
  @log_table = log_table
  @task_queue = task_queue
  @working_dir = working_dir
  @logger = logger
  @job_class = job_class
end

Instance Attribute Details

#job_classObject



144
145
146
# File 'lib/bricolage/streamingload/taskhandler.rb', line 144

def job_class
  @job_class ||= Job
end

#loggerObject (readonly)

Returns the value of attribute logger.



111
112
113
# File 'lib/bricolage/streamingload/taskhandler.rb', line 111

def logger
  @logger
end

Class Method Details

.create_pid_file(path) ⇒ Object



92
93
94
95
96
97
98
# File 'lib/bricolage/streamingload/taskhandler.rb', line 92

def TaskHandler.create_pid_file(path)
  File.open(path, 'w') {|f|
    f.puts $$
  }
rescue
  # ignore
end

.mainObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/bricolage/streamingload/taskhandler.rb', line 19

def TaskHandler.main
  Raven.capture_message("loader start")

  opts = TaskHandlerOptions.new(ARGV)
  opts.parse
  unless opts.rest_arguments.size <= 1
    $stderr.puts opts.usage
    exit 1
  end
  config_path = opts.rest_arguments[0] || "#{opts.working_dir}/config/#{opts.environment}/streamingload.yml"
  config = YAML.load(File.read(config_path))

  logger = opts.log_file_path ? new_logger(opts.log_file_path, config) : nil
  ctx = Context.for_application(opts.working_dir, environment: opts.environment, logger: logger)

  ctl_ds = ctx.get_data_source('sql', config.fetch('ctl-postgres-ds', 'db_ctl'))
  data_ds = ctx.get_data_source('sql', config.fetch('redshift-ds', 'db_data'))
  task_queue = ctx.get_data_source('sqs', config.fetch('task-queue-ds', 'sqs_task'))
  log_table = config.fetch('log-table', 'strload_load_logs')
  service_logger =
    if config.key?('alert-level')
      new_alerting_logger(ctx, config)
    else
      ctx.logger
    end

  task_handler = new(
    context: ctx,
    ctl_ds: ctl_ds,
    data_ds: data_ds,
    log_table: log_table,
    task_queue: task_queue,
    working_dir: opts.working_dir,
    logger: service_logger,
    job_class: opts.noop? ? NoopJob : Job
  )

  if opts.task_id
    # Single task mode
    task_handler.execute_task_by_id opts.task_id, force: opts.force?
  else
    # Server mode
    Process.daemon(true) if opts.daemon?
    Dir.chdir '/'
    create_pid_file opts.pid_file_path if opts.pid_file_path
    begin
      service_logger.info "*** bricolage-streaming-loader started: pid=#{$$}"
      task_handler.event_loop
      service_logger.info "*** bricolage-streaming-loader shutdown gracefully: pid=#{$$}"
    rescue Exception => ex
      service_logger.exception(ex)
      service_logger.error "*** bricolage-streaming-loader abort: pid=#{$$}"
      raise
    end
  end
end

.new_alerting_logger(ctx, config) ⇒ Object



84
85
86
87
88
89
90
# File 'lib/bricolage/streamingload/taskhandler.rb', line 84

def TaskHandler.new_alerting_logger(ctx, config)
  AlertingLogger.new(
    logger: ctx.logger,
    sns_datasource: ctx.get_data_source('sns', config.fetch('sns-ds', 'sns')),
    alert_level: config.fetch('alert-level', 'warn')
  )
end

.new_logger(path, config) ⇒ Object



76
77
78
79
80
81
82
# File 'lib/bricolage/streamingload/taskhandler.rb', line 76

def TaskHandler.new_logger(path, config)
  Logger.new(
    device: path,
    rotation_period: config.fetch('log-rotation-period', 'daily'),
    rotation_size: config.fetch('log-rotation-size', nil)
  )
end

Instance Method Details

#event_loopObject



118
119
120
# File 'lib/bricolage/streamingload/taskhandler.rb', line 118

def event_loop
  @task_queue.handle_messages(handler: self, message_class: LoaderMessage)
end

#execute_task_by_id(task_id, force: false) ⇒ Object



113
114
115
116
# File 'lib/bricolage/streamingload/taskhandler.rb', line 113

def execute_task_by_id(task_id, force: false)
  job = new_job(task_id, force)
  job.execute(fail_fast: true)
end

#handle_streaming_load_v3(t) ⇒ Object

message handler



129
130
131
132
133
134
135
136
137
138
# File 'lib/bricolage/streamingload/taskhandler.rb', line 129

def handle_streaming_load_v3(t)
  Dir.chdir(@working_dir) {
    job = new_job(t.task_id, t.force?)
    if job.execute
      @task_queue.delete_message(t)
    end
  }
rescue Exception => ex
  @logger.exception ex
end

#handle_unknown(t) ⇒ Object

message handler



123
124
125
126
# File 'lib/bricolage/streamingload/taskhandler.rb', line 123

def handle_unknown(t)
  @logger.warn "unknown task: #{t.message_body}"
  @task_queue.delete_message t
end

#new_job(task_id, force) ⇒ Object



140
141
142
# File 'lib/bricolage/streamingload/taskhandler.rb', line 140

def new_job(task_id, force)
  @job_class.new(context: @ctx, ctl_ds: @ctl_ds, data_ds: @data_ds, log_table: @log_table, task_id: task_id, force: force, logger: @logger)
end