Class: Streamworker::Workers::Worker

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/streamworker/workers/worker.rb

Direct Known Subclasses

ShopifyWorker

Constant Summary collapse

QUERIES_PER_BLOCK =
500
TIME_PER_BLOCK =
300
TIMEOUT_MARGIN =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(view_context, opts = {}) ⇒ Worker

Returns a new instance of Worker.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/streamworker/workers/worker.rb', line 16

def initialize(view_context, opts={})
  @opts = opts.with_indifferent_access
  @view_context = view_context

  @title = "Working..."
  @repeats = opts[:repeats] || 1
  @repeats = @repeats.to_i
  @fragment = false
  @started_at = Time.now

  if defined?(AppConfig)
    @opts[:unicorn_timeout] ||= AppConfig.unicorn_timeout
  end

  @opts[:unicorn_timeout] ||= ENV['UNICORN_TIMEOUT']
  @opts[:unicorn_timeout] ||= 30
  @opts[:unicorn_timeout] = @opts[:unicorn_timeout].to_i
  @num_records = opts[:num_records].to_i || 1
  @num_success = 0
  @num_errors = 0
end

Instance Attribute Details

#line_numObject

subclasses responsible for setting this as appropriate



12
13
14
# File 'lib/streamworker/workers/worker.rb', line 12

def line_num
  @line_num
end

#num_errorsObject

Returns the value of attribute num_errors.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_errors
  @num_errors
end

#num_recordsObject

Returns the value of attribute num_records.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_records
  @num_records
end

#num_successObject

Returns the value of attribute num_success.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_success
  @num_success
end

#optsObject

Returns the value of attribute opts.



10
11
12
# File 'lib/streamworker/workers/worker.rb', line 10

def opts
  @opts
end

#repeatsObject

Returns the value of attribute repeats.



11
12
13
# File 'lib/streamworker/workers/worker.rb', line 11

def repeats
  @repeats
end

#titleObject

Returns the value of attribute title.



13
14
15
# File 'lib/streamworker/workers/worker.rb', line 13

def title
  @title
end

#view_contextObject

Returns the value of attribute view_context.



10
11
12
# File 'lib/streamworker/workers/worker.rb', line 10

def view_context
  @view_context
end

Instance Method Details

#calculate_timesObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/streamworker/workers/worker.rb', line 51

def calculate_times
  actual_time_used = Time.now - @started_at
  work_time_remaining = opts[:unicorn_timeout] - actual_time_used
  theoretical_total_time = (projected_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK
  theoretical_time_used = (num_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK
  factor = actual_time_used.to_f / theoretical_time_used
  factor = [factor, 1].max if projected_queries > QUERIES_PER_BLOCK
  total_time = theoretical_total_time * factor

  # puts "--------- calculate_times ---------"
  # puts "Time.now: #{Time.now.inspect}"
  # puts "@started_at: #{@started_at.inspect}"
  # puts "QUERIES_PER_BLOCK: #{QUERIES_PER_BLOCK.inspect}"
  # puts "TIME_PER_BLOCK: #{TIME_PER_BLOCK.inspect}"
  # puts "(self.num_records * self.queries_per_record): #{(self.num_records * self.queries_per_record).inspect}"
  # puts "opts[:unicorn_timeout] : #{opts[:unicorn_timeout] .inspect}"
  # puts "actual_time_used: #{actual_time_used.inspect}"
  # puts "work_time_remaining: #{work_time_remaining.inspect}"
  # puts "theoretical_total_time: #{theoretical_total_time.inspect}"
  # puts "theoretical_time_used: #{theoretical_time_used.inspect}"
  # puts "factor: #{factor.inspect}"
  # puts "total_time: #{total_time.inspect}"
  # puts "(total_time - actual_time_used): #{(total_time - actual_time_used).inspect}"
  # puts
  {
    work_time: opts[:unicorn_timeout] .to_i,
    work_time_remaining: work_time_remaining,
    time_used: actual_time_used,
    time_remaining: (total_time - actual_time_used),
    total_time: total_time
  }
end

#close_report_lineObject



187
188
189
# File 'lib/streamworker/workers/worker.rb', line 187

def close_report_line
  fragment? ? report_line("", close: true) : ""
end

#eachObject



232
233
234
# File 'lib/streamworker/workers/worker.rb', line 232

def each
  raise "Worker subclasses must implement each to yield their output"
end

#error_line_numObject



228
229
230
# File 'lib/streamworker/workers/worker.rb', line 228

def error_line_num
  %Q{<span class="badge badge-important badge-line-num">#{line_num}</span>}
end

#footObject



160
161
162
163
164
165
# File 'lib/streamworker/workers/worker.rb', line 160

def foot
  %Q{
</body>
    </html>
}
end


150
151
152
153
154
155
156
157
158
# File 'lib/streamworker/workers/worker.rb', line 150

def footer(msg)
  "    </div>\n    <h3>\#{msg}</h3>\n    \#{scroll}\n\n  </div>\#{self.foot}\n  EOHTML\nend\n"

#fragment?Boolean

Returns:

  • (Boolean)


179
180
181
# File 'lib/streamworker/workers/worker.rb', line 179

def fragment?
  @fragment
end

#head(scroll = true) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/streamworker/workers/worker.rb', line 135

def head(scroll=true)
  scroll = scroll ? "" : %Q{ style="overflow: hidden;"}
  %Q{
    <!DOCTYPE html>
    <html class="white"#{scroll}>
<head>
  #{view_context.stylesheet_link_tag('application')}
  #{view_context.javascript_include_tag('application')}
  #{view_context.javascript_include_tag('scroller')}
  <title>#{self.title}</title>
</head>
<body class="stream-worker-results">      
  }
end

#headerObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/streamworker/workers/worker.rb', line 120

def header
  repeats =  ""
  repeats = %Q{<p class="muted">Repeating #{@repeats} times</p>} if @repeats > 1 

  header = "  \#{self.head}<div class=\"container\">\n      \#{repeats}\n      <div class=\"import-results\">\n  EOHTML\n  # Safari waits until it gets the first 1024 bytes to start displaying\n  Rails.logger.debug header\n\n  header + (\" \" * [0, (1025 - header.length)].max) \nend\n"

#imminent_timeout?Boolean

Returns:

  • (Boolean)


84
85
86
87
88
89
# File 'lib/streamworker/workers/worker.rb', line 84

def imminent_timeout?
  # puts "--------- imminent_timeout ---------"
  # puts "work_time_remaining: #{calculate_times[:work_time_remaining].inspect}"
  # puts "TIMEOUT_MARGIN: #{TIMEOUT_MARGIN.inspect}"
  calculate_times[:work_time_remaining] < TIMEOUT_MARGIN
end

#num_queriesObject



47
48
49
# File 'lib/streamworker/workers/worker.rb', line 47

def num_queries
  (self.num_success + self.num_errors) .to_f * self.queries_per_record
end

#open_report_line(str) ⇒ Object



175
176
177
# File 'lib/streamworker/workers/worker.rb', line 175

def open_report_line(str)
  report_line(str, close: false)
end

#projected_queriesObject



43
44
45
# File 'lib/streamworker/workers/worker.rb', line 43

def projected_queries
  (self.num_records * self.queries_per_record).to_f
end

#queries_per_recordObject



39
40
41
# File 'lib/streamworker/workers/worker.rb', line 39

def queries_per_record
  1
end

#report_error(str, list = []) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/streamworker/workers/worker.rb', line 210

def report_error(str, list=[])
  err = %Q{
      #{error_line_num}
      <div class="alert alert-error">
          <p class="text-error"><i class="icon-warning-sign icon-large"></i>#{str}</p>
        }
  err << %Q{<ul class="error-list">\n} unless list.empty?
  list.each { |e| err << %Q{              <li>#{e}</li>\n} }
  err << %Q{              </ul>\n} unless list.empty?
  err << %Q{           </div>}
  
  err
end

#report_fragment(str) ⇒ Object



183
184
185
# File 'lib/streamworker/workers/worker.rb', line 183

def report_fragment(str)
  fragment? ? str : open_report_line(str)
end

#report_line(str, opts = {}) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/streamworker/workers/worker.rb', line 191

def report_line(str, opts={})
  # Rails.logger.info("report_line str: #{str.inspect} opts: #{opts.inspect} fragment?: #{fragment?.inspect}")
  opts = {close: true}.merge(opts)
  p_class = ["report-line", opts[:class]].compact.join(" ")
  start = fragment? ?  "" : %Q{
      <p class="#{p_class}">}
  @fragment = ! opts[:close]
  close = ""
  if opts[:close]
    close = %Q{</p>
        #{scroll}
    } 
  end
  out = %Q{#{start}#{str}#{close}}
  # Rails.logger.info("    out: #{out.inspect}")
  out
end


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/streamworker/workers/worker.rb', line 91

def report_timeout_footer(msg={})
  msg[:work_desc] ||= "#{num_success} records"
  msg[:how_to_finish] ||= "resubmit the last #{num_records - num_success} records."
  times = calculate_times
  %Q{ 
    </div>
    <hr/>
    <div class="alert alert-error alert_block span8">
      
      <h4><i class="icon-time icon-large pull-left"></i>Server Timeout!</h4>
      <br/>
        Unfortunately, the backend processing time is limited to #{times[:work_time]} seconds, so we have to stop processing this job after #{msg[:work_desc]}.
      
      <br/><br/>
        To finish processing, please #{msg[:how_to_finish]}.
      
    </div>
    #{scroll}
  </div>#{self.foot}
  }
end

#scrollObject



167
168
169
170
171
172
173
# File 'lib/streamworker/workers/worker.rb', line 167

def scroll
  %Q{<script type="text/javascript">
        scrollBottom();
        parent.update_stream_worker_progress(#{num_records}, #{num_success}, #{num_errors});
      </script>
  }
end

#set_headers(response) ⇒ Object



113
114
115
116
117
118
# File 'lib/streamworker/workers/worker.rb', line 113

def set_headers(response)
  response.headers['Last-Modified'] = Time.now.ctime.to_s
  response.headers.delete('Content-Length')
  response.headers['Cache-Control'] = 'no-cache'
  response.headers['Transfer-Encoding'] = 'chunked'
end

#success_line_numObject



224
225
226
# File 'lib/streamworker/workers/worker.rb', line 224

def success_line_num
  %Q{<span class="badge badge-success badge-line-num">#{line_num}</span>}
end