Class: TinyWorkService

Inherits:
Object
  • Object
show all
Defined in:
lib/tiny_work_service.rb

Overview

usage:

s = TinyWorkService.new(1234, 'TinyWorkService')
s.stop!

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(port, label) ⇒ TinyWorkService

Returns a new instance of TinyWorkService.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/tiny_work_service.rb', line 13

def initialize(port, label)
  @service = TinyTCPService.new(port)
  @service.msg_handler = self
  @jobs = Queue.new
  @label = label

  @jobs_enqueued = 0
  @jobs_dequeued = 0
  @jobs_to_track = Queue.new
  @jobs_dequeued_tracker = []

  @jobs_per_minute = 0
  @jobs_per_hour = 0

  # status printing thread
  Thread.new do
    print "\e[?25l" # hide cursor
    loop do
      break unless @service.running?

      print "\e[1;1H"
      puts "#{DateTime.now.iso8601}\e[K"
      puts "#{@label}:#{port}\e[K"
      puts "workers :#{@service.num_clients.to_s.rjust(10)}\e[K"
      puts "queue   :#{@jobs.length.to_s.rjust(10)}\e[K"
      puts "jobs/m  :#{@jobs_per_minute.to_s.rjust(10)}\e[K"
      print "jobs/h  :#{@jobs_per_hour.to_s.rjust(10)}\e[K"
      sleep 0.5
    end
    print "\e[?25h" # show cursor
  end

  # update stats thread
  Thread.new do
    loop do
      one_minute_ago = Time.now.to_i - 60
      one_hour_ago = Time.now.to_i - 3600

      # move jobs_to_track into jobs_dequeued_tracker, threadsafe
      loop do
        break if @jobs_to_track.length == 0
        @jobs_dequeued_tracker << @jobs_to_track.shift
      end

      # remove job tracking times from older than one_hour_ago
      loop do
        break if @jobs_dequeued_tracker.empty? || @jobs_dequeued_tracker.first >= one_hour_ago
        @jobs_dequeued_tracker.shift
      end

      counter = 0
      i = -1
      loop do
        break if i.abs > @jobs_dequeued_tracker.length || @jobs_dequeued_tracker[i] < one_minute_ago
        i -= 1
        counter += 1
      end
      @jobs_per_minute = counter
      @jobs_per_hour = @jobs_dequeued_tracker.count

      sleep 2
    end
  end
end

Instance Attribute Details

#jobs_dequeuedObject (readonly)

Returns the value of attribute jobs_dequeued.



8
9
10
# File 'lib/tiny_work_service.rb', line 8

def jobs_dequeued
  @jobs_dequeued
end

#jobs_enqueuedObject (readonly)

Returns the value of attribute jobs_enqueued.



8
9
10
# File 'lib/tiny_work_service.rb', line 8

def jobs_enqueued
  @jobs_enqueued
end

#jobs_per_hourObject (readonly)

Returns the value of attribute jobs_per_hour.



8
9
10
# File 'lib/tiny_work_service.rb', line 8

def jobs_per_hour
  @jobs_per_hour
end

#jobs_per_minuteObject (readonly)

Returns the value of attribute jobs_per_minute.



8
9
10
# File 'lib/tiny_work_service.rb', line 8

def jobs_per_minute
  @jobs_per_minute
end

Instance Method Details

#<<(j) ⇒ Object

enqueue a job



99
100
101
102
103
104
# File 'lib/tiny_work_service.rb', line 99

def <<(j)
  @jobs_enqueued += 1
  @jobs << j

  nil
end

#call(m) ⇒ Object

interface for TinyTCPService

Raises:

  • (TinyTCPService::BadClient)


79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/tiny_work_service.rb', line 79

def call(m)
  raise TinyTCPService::BadClient.new("nil message") if m.nil?

  case
  when m[0] == '+'        # add a job to the queue
    self << m[1..]
    'ok'                  # ok, job received
  when m[0] == '-'        # take a job from the queue
    shift || ''
  else
    raise TinyTCPService::BadClient.new("Client sent invalid message: `#{m[..50]}'")
  end
end

#joinObject

join the service Thread, if you want to wait until it’s done



94
95
96
# File 'lib/tiny_work_service.rb', line 94

def join
  @service.join
end

#shiftObject

return the first job in the work queue, if there is one present otherwise, return nil



108
109
110
111
112
113
114
# File 'lib/tiny_work_service.rb', line 108

def shift
  return nil if @jobs.empty?
  @jobs_dequeued += 1
  @jobs_to_track << Time.now.to_i

  @jobs.shift
end

#stop!Object



116
117
118
# File 'lib/tiny_work_service.rb', line 116

def stop!
  @service.stop!
end