Class: Streamworker::Workers::Worker

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/streamworker/workers/worker.rb

Direct Known Subclasses

ShopifyWorker

Constant Summary collapse

QUERIES_PER_BLOCK =
500
TIME_PER_BLOCK =
300
TIMEOUT_MARGIN =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(view_context, opts = {}) ⇒ Worker

Returns a new instance of Worker.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/streamworker/workers/worker.rb', line 16

def initialize(view_context, opts={})
  @opts = opts.with_indifferent_access
  @view_context = view_context

  @title = "Working..."
  @repeats = opts[:repeats] || 1
  @repeats = @repeats.to_i
  @fragment = false
  @started_at = Time.now
  @opts[:unicorn_timeout] ||= ENV['UNICORN_TIMEOUT']
  @opts[:unicorn_timeout] ||= 30
  @opts[:unicorn_timeout] = @opts[:unicorn_timeout].to_i
  @num_records = opts[:num_records].to_i || 1
  @num_success = 0
  @num_errors = 0
end

Instance Attribute Details

#line_numObject

subclasses responsible for setting this as appropriate



12
13
14
# File 'lib/streamworker/workers/worker.rb', line 12

def line_num
  @line_num
end

#num_errorsObject

Returns the value of attribute num_errors.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_errors
  @num_errors
end

#num_recordsObject

Returns the value of attribute num_records.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_records
  @num_records
end

#num_successObject

Returns the value of attribute num_success.



14
15
16
# File 'lib/streamworker/workers/worker.rb', line 14

def num_success
  @num_success
end

#optsObject

Returns the value of attribute opts.



10
11
12
# File 'lib/streamworker/workers/worker.rb', line 10

def opts
  @opts
end

#repeatsObject

Returns the value of attribute repeats.



11
12
13
# File 'lib/streamworker/workers/worker.rb', line 11

def repeats
  @repeats
end

#titleObject

Returns the value of attribute title.



13
14
15
# File 'lib/streamworker/workers/worker.rb', line 13

def title
  @title
end

#view_contextObject

Returns the value of attribute view_context.



10
11
12
# File 'lib/streamworker/workers/worker.rb', line 10

def view_context
  @view_context
end

Instance Method Details

#calculate_timesObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/streamworker/workers/worker.rb', line 46

def calculate_times
  actual_time_used = Time.now - @started_at
  work_time_remaining = opts[:unicorn_timeout] - actual_time_used
  theoretical_total_time = (projected_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK
  theoretical_time_used = (num_queries / QUERIES_PER_BLOCK) * TIME_PER_BLOCK
  factor = actual_time_used.to_f / theoretical_time_used
  factor = [factor, 1].max if projected_queries > QUERIES_PER_BLOCK
  total_time = theoretical_total_time * factor

  # puts "--------- calculate_times ---------"
  # puts "Time.now: #{Time.now.inspect}"
  # puts "@started_at: #{@started_at.inspect}"
  # puts "QUERIES_PER_BLOCK: #{QUERIES_PER_BLOCK.inspect}"
  # puts "TIME_PER_BLOCK: #{TIME_PER_BLOCK.inspect}"
  # puts "(self.num_records * self.queries_per_record): #{(self.num_records * self.queries_per_record).inspect}"
  # puts "opts[:unicorn_timeout] : #{opts[:unicorn_timeout] .inspect}"
  # puts "actual_time_used: #{actual_time_used.inspect}"
  # puts "work_time_remaining: #{work_time_remaining.inspect}"
  # puts "theoretical_total_time: #{theoretical_total_time.inspect}"
  # puts "theoretical_time_used: #{theoretical_time_used.inspect}"
  # puts "factor: #{factor.inspect}"
  # puts "total_time: #{total_time.inspect}"
  # puts "(total_time - actual_time_used): #{(total_time - actual_time_used).inspect}"
  # puts
  {
    work_time: opts[:unicorn_timeout] .to_i,
    work_time_remaining: work_time_remaining,
    time_used: actual_time_used,
    time_remaining: (total_time - actual_time_used),
    total_time: total_time
  }
end

#close_report_lineObject



182
183
184
# File 'lib/streamworker/workers/worker.rb', line 182

def close_report_line
  fragment? ? report_line("", close: true) : ""
end

#eachObject



227
228
229
# File 'lib/streamworker/workers/worker.rb', line 227

def each
  raise "Worker subclasses must implement each to yield their output"
end

#error_line_numObject



223
224
225
# File 'lib/streamworker/workers/worker.rb', line 223

def error_line_num
  %Q{<span class="badge badge-important badge-line-num">#{line_num}</span>}
end

#footObject



155
156
157
158
159
160
# File 'lib/streamworker/workers/worker.rb', line 155

def foot
  %Q{
</body>
    </html>
}
end


145
146
147
148
149
150
151
152
153
# File 'lib/streamworker/workers/worker.rb', line 145

def footer(msg)
  "    </div>\n    <h3>\#{msg}</h3>\n    \#{scroll}\n\n  </div>\#{self.foot}\n  EOHTML\nend\n"

#fragment?Boolean

Returns:

  • (Boolean)


174
175
176
# File 'lib/streamworker/workers/worker.rb', line 174

def fragment?
  @fragment
end

#head(scroll = true) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/streamworker/workers/worker.rb', line 130

def head(scroll=true)
  scroll = scroll ? "" : %Q{ style="overflow: hidden;"}
  %Q{
    <!DOCTYPE html>
    <html class="white"#{scroll}>
<head>
  #{view_context.stylesheet_link_tag('application')}
  #{view_context.javascript_include_tag('application')}
  #{view_context.javascript_include_tag('scroller')}
  <title>#{self.title}</title>
</head>
<body class="stream-worker-results">      
  }
end

#headerObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/streamworker/workers/worker.rb', line 115

def header
  repeats =  ""
  repeats = %Q{<p class="muted">Repeating #{@repeats} times</p>} if @repeats > 1 

  header = "  \#{self.head}<div class=\"container\">\n      \#{repeats}\n      <div class=\"import-results\">\n  EOHTML\n  # Safari waits until it gets the first 1024 bytes to start displaying\n  Rails.logger.debug header\n\n  header + (\" \" * [0, (1025 - header.length)].max) \nend\n"

#imminent_timeout?Boolean

Returns:

  • (Boolean)


79
80
81
82
83
84
# File 'lib/streamworker/workers/worker.rb', line 79

def imminent_timeout?
  # puts "--------- imminent_timeout ---------"
  # puts "work_time_remaining: #{calculate_times[:work_time_remaining].inspect}"
  # puts "TIMEOUT_MARGIN: #{TIMEOUT_MARGIN.inspect}"
  calculate_times[:work_time_remaining] < TIMEOUT_MARGIN
end

#num_queriesObject



42
43
44
# File 'lib/streamworker/workers/worker.rb', line 42

def num_queries
  (self.num_success + self.num_errors) .to_f * self.queries_per_record
end

#open_report_line(str) ⇒ Object



170
171
172
# File 'lib/streamworker/workers/worker.rb', line 170

def open_report_line(str)
  report_line(str, close: false)
end

#projected_queriesObject



38
39
40
# File 'lib/streamworker/workers/worker.rb', line 38

def projected_queries
  (self.num_records * self.queries_per_record).to_f
end

#queries_per_recordObject



34
35
36
# File 'lib/streamworker/workers/worker.rb', line 34

def queries_per_record
  1
end

#report_error(str, list = []) ⇒ Object



205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/streamworker/workers/worker.rb', line 205

def report_error(str, list=[])
  err = %Q{
      #{error_line_num}
      <div class="alert alert-error">
          <p class="text-error"><i class="icon-warning-sign icon-large"></i>#{str}</p>
        }
  err << %Q{<ul class="error-list">\n} unless list.empty?
  list.each { |e| err << %Q{              <li>#{e}</li>\n} }
  err << %Q{              </ul>\n} unless list.empty?
  err << %Q{           </div>}
  
  err
end

#report_fragment(str) ⇒ Object



178
179
180
# File 'lib/streamworker/workers/worker.rb', line 178

def report_fragment(str)
  fragment? ? str : open_report_line(str)
end

#report_line(str, opts = {}) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/streamworker/workers/worker.rb', line 186

def report_line(str, opts={})
  # Rails.logger.info("report_line str: #{str.inspect} opts: #{opts.inspect} fragment?: #{fragment?.inspect}")
  opts = {close: true}.merge(opts)
  p_class = ["report-line", opts[:class]].compact.join(" ")
  start = fragment? ?  "" : %Q{
      <p class="#{p_class}">}
  @fragment = ! opts[:close]
  close = ""
  if opts[:close]
    close = %Q{</p>
        #{scroll}
    } 
  end
  out = %Q{#{start}#{str}#{close}}
  # Rails.logger.info("    out: #{out.inspect}")
  out
end


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/streamworker/workers/worker.rb', line 86

def report_timeout_footer(msg={})
  msg[:work_desc] ||= "#{num_success} records"
  msg[:how_to_finish] ||= "resubmit the last #{num_records - num_success} records."
  times = calculate_times
  %Q{ 
    </div>
    <hr/>
    <div class="alert alert-error alert_block span8">
      
      <h4><i class="icon-time icon-large pull-left"></i>Server Timeout!</h4>
      <br/>
        Unfortunately, the backend processing time is limited to #{times[:work_time]} seconds, so we have to stop processing this job after #{msg[:work_desc]}.
      
      <br/><br/>
        To finish processing, please #{msg[:how_to_finish]}.
      
    </div>
    #{scroll}
  </div>#{self.foot}
  }
end

#scrollObject



162
163
164
165
166
167
168
# File 'lib/streamworker/workers/worker.rb', line 162

def scroll
  %Q{<script type="text/javascript">
        scrollBottom();
        parent.update_stream_worker_progress(#{num_records}, #{num_success}, #{num_errors});
      </script>
  }
end

#set_headers(response) ⇒ Object



108
109
110
111
112
113
# File 'lib/streamworker/workers/worker.rb', line 108

def set_headers(response)
  response.headers['Last-Modified'] = Time.now.ctime.to_s
  response.headers.delete('Content-Length')
  response.headers['Cache-Control'] = 'no-cache'
  response.headers['Transfer-Encoding'] = 'chunked'
end

#success_line_numObject



219
220
221
# File 'lib/streamworker/workers/worker.rb', line 219

def success_line_num
  %Q{<span class="badge badge-success badge-line-num">#{line_num}</span>}
end