Class: TinyWorkService

Inherits:
Object
  • Object
show all
Defined in:
lib/tiny_work_service.rb

Overview

usage:

s = TinyWorkService.new(1234, 'TinyWorkService')
s.stop!

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(port, label, refresh_interval_in_seconds) ⇒ TinyWorkService



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/tiny_work_service.rb', line 14

def initialize(port, label, refresh_interval_in_seconds)
  @service = TinyTCPService.new(port)
  @service.msg_handler = self
  @jobs = Queue.new
  @label = label
  @label_and_port = "#{@label}:#{port}"

  @jobs_enqueued = 0
  @jobs_dequeued = 0
  @jobs_to_track = Queue.new
  @jobs_dequeued_tracker = []

  @jobs_per_minute = 0
  @jobs_per_hour = 0
  @errors_total = 0
  @last_insert_time = Time.now
  @last_insert_queue_count = [@jobs.length, 1].max

  # status printing thread
  Thread.new do
    start_time = Time.now
    print "\e[?25l" # hide cursor
    loop do
      break unless @service.running?
      sec_runtime = Time.now - start_time
      human_runtime = "%02d:%02d:%02d" % [sec_runtime / 3600, sec_runtime / 60 % 60, sec_runtime % 60]
      elapsed_time_since_last_insert = Time.now - @last_insert_time
      jobs_completed_since_last_insert = @last_insert_queue_count - @jobs.length
      percent_completed_since_last_insert = jobs_completed_since_last_insert / @last_insert_queue_count.to_f

      print "\e[1;1H"
      puts "label/port: #{@label_and_port.rjust(28)}\e[K"
      puts "time      : #{DateTime.now.iso8601.rjust(28)}\e[K"
      puts "runtime   : #{human_runtime.rjust(28)}\e[K"
      puts "workers   : #{@service.num_clients.to_s.rjust(28)}\e[K"
      puts "queue     : #{@jobs.length.to_s.rjust(28)}\e[K"
      puts "jobs/m    : #{@jobs_per_minute.to_s.rjust(28)}\e[K"
      puts "jobs/h    : #{@jobs_per_hour.to_s.rjust(28)}\e[K"
      puts "errors    : #{@errors_total.to_s.rjust(28)}\e[K"
      print "eta       : #{TinyEta.eta(elapsed_time_since_last_insert, percent_completed_since_last_insert ).rjust(28)}\e[K"

      sleep refresh_interval_in_seconds
    end
    print "\e[?25h" # show cursor
  end

  # update stats thread
  Thread.new do
    loop do
      one_minute_ago = Time.now.to_i - 60
      one_hour_ago = Time.now.to_i - 3600

      # move jobs_to_track into jobs_dequeued_tracker, threadsafe
      loop do
        break if @jobs_to_track.length == 0
        @jobs_dequeued_tracker << @jobs_to_track.shift
      end

      # remove job tracking times from older than one_hour_ago
      loop do
        break if @jobs_dequeued_tracker.empty? || @jobs_dequeued_tracker.first >= one_hour_ago
        @jobs_dequeued_tracker.shift
      end

      counter = 0
      i = -1
      loop do
        break if i.abs > @jobs_dequeued_tracker.length || @jobs_dequeued_tracker[i] < one_minute_ago
        i -= 1
        counter += 1
      end
      @jobs_per_minute = counter
      @jobs_per_hour = @jobs_dequeued_tracker.count

      sleep 2
    end
  end
end

Instance Attribute Details

#jobs_dequeuedObject (readonly)

Returns the value of attribute jobs_dequeued.



9
10
11
# File 'lib/tiny_work_service.rb', line 9

def jobs_dequeued
  @jobs_dequeued
end

#jobs_enqueuedObject (readonly)

Returns the value of attribute jobs_enqueued.



9
10
11
# File 'lib/tiny_work_service.rb', line 9

def jobs_enqueued
  @jobs_enqueued
end

#jobs_per_hourObject (readonly)

Returns the value of attribute jobs_per_hour.



9
10
11
# File 'lib/tiny_work_service.rb', line 9

def jobs_per_hour
  @jobs_per_hour
end

#jobs_per_minuteObject (readonly)

Returns the value of attribute jobs_per_minute.



9
10
11
# File 'lib/tiny_work_service.rb', line 9

def jobs_per_minute
  @jobs_per_minute
end

Instance Method Details

#<<(j) ⇒ Object

enqueue a job



120
121
122
123
124
125
# File 'lib/tiny_work_service.rb', line 120

def <<(j)
  @jobs_enqueued += 1
  @jobs << j

  nil
end

#call(m) ⇒ Object

interface for TinyTCPService

Raises:

  • (TinyTCPService::BadClient)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/tiny_work_service.rb', line 94

def call(m)
  raise TinyTCPService::BadClient.new("nil message") if m.nil?

  case
  when m[0] == '+'        # add a job to the queue
    self << m[1..]

    @last_insert_time = Time.now
    @last_insert_queue_count = @jobs.length

    'ok'                  # ok, job received
  when m[0] == '-'        # take a job from the queue
    shift || ''
  when m[0] == '!'        # register an error
    @errors_total += 1
  else
    raise TinyTCPService::BadClient.new("Client sent invalid message: `#{m[..50]}'")
  end
end

#joinObject

join the service Thread, if you want to wait until it’s done



115
116
117
# File 'lib/tiny_work_service.rb', line 115

def join
  @service.join
end

#shiftObject

return the first job in the work queue, if there is one present otherwise, return nil



129
130
131
132
133
134
135
# File 'lib/tiny_work_service.rb', line 129

def shift
  return nil if @jobs.empty?
  @jobs_dequeued += 1
  @jobs_to_track << Time.now.to_i

  @jobs.shift
end

#stop!Object



137
138
139
# File 'lib/tiny_work_service.rb', line 137

def stop!
  @service.stop!
end