Class: WatchtowerJob::Application

Inherits:
Object
  • Object
show all
Defined in:
lib/watchtower_job/application.rb

Constant Summary collapse

CHECK_IN_FREQUENCY =

5 minutes

5 * 60

Instance Method Summary collapse

Constructor Details

#initialize(skip_errors: false, sleep_time: 30, last_run_file: nil, datafeed_state_file: nil) ⇒ Application

Returns a new instance of Application.



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/watchtower_job/application.rb', line 8

def initialize(skip_errors:false,sleep_time:30,last_run_file:nil,datafeed_state_file:nil)
  @datafeed_state_file = datafeed_state_file
  @data_feeds = {}
  get_datafeed_state
  @sleep_time = sleep_time
  @skip_errors = skip_errors
  @last_run_file = last_run_file
  s3 = Aws::S3::Resource.new(Einstein::AwsKeys::AwsKeys.get_keys.merge(region:'us-east-1'))
  config = YAML.load s3.bucket('cttools-watchtower').object("configs/kinesis_keys.yaml").get.body.string
  @stream = KinesisReader.new(config[WatchtowerJob.nt2_env], last_sequence_number)
  @workflow_counter = WorkflowCounts.new
end

Instance Method Details

#check_inObject



119
120
121
122
123
124
125
126
# File 'lib/watchtower_job/application.rb', line 119

def check_in
  check_in_time = Time.now
  if (check_in_time - @last_check_in).to_i > CHECK_IN_FREQUENCY
    post_check_in(check_in_time)
    @workflow_counter.send_counts
    @last_check_in = check_in_time
  end
end

#get_datafeed_state(file = @datafeed_state_file) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/watchtower_job/application.rb', line 91

def get_datafeed_state(file=@datafeed_state_file)
  return nil if file.nil?
  if File.exist?(file)
    datafeed_list = YAML.load File.read(file)
    WatchtowerJob.log.info "found #{datafeed_list.count} datafeeds, beginning load..."
    threads = []
    in_groups(datafeed_list,3).each_with_index do |list,t_index|
      threads << Thread.new do
        list.each_with_index do |id,index|
          WatchtowerJob.log.info "thread:#{(t_index + 1)} Fetching(#{(t_index + 1) * index}): #{id} ..."
          get_feed(id,false)
        end
      end
    end
    threads.map(&:join)
    WatchtowerJob.log.info "datafeed state is: (#{@data_feeds.keys})"
  else
    WatchtowerJob.log.info "#{file} does not exist -- will start with empty state"
  end
end

#get_feed(id, never_been_cleared = true) ⇒ Object



112
113
114
115
116
117
# File 'lib/watchtower_job/application.rb', line 112

def get_feed(id,never_been_cleared=true)
  @data_feeds[id] ||= DataFeed.new(id,never_been_cleared)
rescue DataFeedNotCompleted
  WatchtowerJob.log.info "data feed not in completed state, ignoring"
  return nil
end

#in_groups(array, number) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/watchtower_job/application.rb', line 140

def in_groups(array,number)
  group_size = array.size / number
  leftovers = array.size % number
  groups = []
  start = 0
  number.times do |index|
    length = group_size + (leftovers > 0 && leftovers > index ? 1 : 0)
    groups << array.slice(start, length)
    start += length
  end
  groups
end

#keep_datafeed_stateObject



82
83
84
85
86
87
88
89
# File 'lib/watchtower_job/application.rb', line 82

def keep_datafeed_state
  return unless @datafeed_state_file
  cleared_list = @data_feeds.select do |id,data_feed|
    data_feed.errors == 0 && !data_feed.never_been_cleared
  end
  File.open(@datafeed_state_file, 'w') { |file| file.write(cleared_list.keys.to_yaml) }
  WatchtowerJob.log.info "Saved datafeed state: (#{cleared_list.keys})"
end

#keep_last_sequence_numberObject



65
66
67
68
69
# File 'lib/watchtower_job/application.rb', line 65

def keep_last_sequence_number
  return unless @last_run_file
  File.open(@last_run_file, 'w') { |file| file.write(@stream.last_sequence_number) }
  WatchtowerJob.log.info "Saved last sequence number. (#{@stream.last_sequence_number})"
end

#last_sequence_number(file = @last_run_file) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/watchtower_job/application.rb', line 71

def last_sequence_number(file=@last_run_file)
  return nil if file.nil?
  if File.exist?(file)
    last_sequence = File.read(file)
    WatchtowerJob.log.info "Stream's last sequence number: (#{last_sequence})"
    return last_sequence
  end
  WatchtowerJob.log.info "#{file} does not exist -- will not use a last_sequence"
  return nil
end

#post_check_in(check_in_time) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/watchtower_job/application.rb', line 128

def post_check_in(check_in_time)
  event_type = "#{WatchtowerJob.nt2_env.upcase}.check_in"
  data = WatchtowerJob.einstein_event_collector.event_body(
    config_item_type: 'Einstein Component',
    config_item_name: 'Watchtower Job',
    event_type: event_type,
    value: 'set',
    start_time: check_in_time
  )
  WatchtowerJob.einstein_event_collector.post_event(data)
end

#runObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/watchtower_job/application.rb', line 21

def run
  startup_time = Time.now
  post_check_in(@last_check_in = startup_time)
  @stream.read do |message|
    check_in
    if message.nil?
      WatchtowerJob.log.debug "Read nothing, sleep for #{@sleep_time}"
      sleep @sleep_time
      next
    end
    message_data = JSON.parse(message['data'])

    if (InterfaceExecution.required_values - message_data.keys).length != 0
      WatchtowerJob.log.info "Ignoring non-InterfaceExecution message: #{message}"
      next
    end

    interface_execution = InterfaceExecution.new(message_data)
    if interface_execution.success? && interface_execution.end_time >= startup_time #prevent sending large counts at startup
      @workflow_counter.count(interface_execution.workflow)
    end

    next if @skip_errors

    data_feed = get_feed(interface_execution.data_feed_id)
    if data_feed.nil?
      #not implemented, do nothing with this execution
      WatchtowerJob.log.debug "Data Feed #{interface_execution.data_feed_id} implementation status is not \"Completed\""
      next
    end
    WatchtowerJob.log.debug "Read Interface Execution for #{interface_execution.data_feed_id}"
    data_feed.handle_latest_execution(interface_execution)
  end
rescue SignalException => e
  WatchtowerJob.log.fatal "Received #{e.message} Signal.... Shutting down..."
  keep_last_sequence_number
  keep_datafeed_state
rescue => e
  WatchtowerJob.log.fatal "Crashed! - #{e.class}: #{e.message}"
  WatchtowerJob.log.debug "Backtrace:\n#{e.backtrace.join('\n')}"
  keep_last_sequence_number
  keep_datafeed_state
end