Class: TinyWorkService

Inherits:
Object
  • Object
show all
Defined in:
lib/tiny_work_service.rb

Overview

usage:

s = TinyWorkService.new(1234)
s.stop!

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(port, label = 'TinyWorkService') ⇒ TinyWorkService

Returns a new instance of TinyWorkService.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/tiny_work_service.rb', line 12

def initialize(port, label='TinyWorkService')
  @service = TinyTCPService.new(port)
  @service.msg_handler = self
  @jobs = Queue.new
  @label = label

  @jobs_enqueued = 0
  @jobs_dequeued = 0
  @jobs_to_track = Queue.new
  @jobs_dequeued_tracker = []

  @jobs_per_minute = 0.0
  @jobs_per_hour = 0.0

  # status printing thread
  Thread.new do
    loop do
      break unless @service.running?

      print "\r#{@label}:#{port} jobs:#{@jobs.length.to_s} workers:#{@service.num_clients.to_s} jobs/m:#{@jobs_per_minute} jobs/h:#{@jobs_per_hour}\e[K"
      sleep 0.5
    end
  end

  # update stats thread
  Thread.new do
    loop do
      one_minute_ago = Time.now.to_i - 60
      one_hour_ago = Time.now.to_i - 3600

      # move jobs_to_track into jobs_dequeued_tracker, threadsafe
      loop do
        break if @jobs_to_track.length == 0
        @jobs_dequeued_tracker << @jobs_to_track.shift
      end

      # remove job tracking times from older than one_hour_ago
      loop do
        break if @jobs_dequeued_tracker.empty? || @jobs_dequeued_tracker.first >= one_hour_ago
        @jobs_dequeued_tracker.shift
      end

      counter = 0
      i = -1
      loop do
        break if i.abs > @jobs_dequeued_tracker.length || @jobs_dequeued_tracker[i] < one_minute_ago
        i -= 1
        counter += 1
      end
      @jobs_per_minute = counter
      @jobs_per_hour = @jobs_dequeued_tracker.count

      sleep 2
    end
  end
end

Instance Attribute Details

#jobs_dequeuedObject (readonly)

Returns the value of attribute jobs_dequeued.



7
8
9
# File 'lib/tiny_work_service.rb', line 7

def jobs_dequeued
  @jobs_dequeued
end

#jobs_enqueuedObject (readonly)

Returns the value of attribute jobs_enqueued.



7
8
9
# File 'lib/tiny_work_service.rb', line 7

def jobs_enqueued
  @jobs_enqueued
end

#jobs_per_hourObject (readonly)

Returns the value of attribute jobs_per_hour.



7
8
9
# File 'lib/tiny_work_service.rb', line 7

def jobs_per_hour
  @jobs_per_hour
end

#jobs_per_minuteObject (readonly)

Returns the value of attribute jobs_per_minute.



7
8
9
# File 'lib/tiny_work_service.rb', line 7

def jobs_per_minute
  @jobs_per_minute
end

Instance Method Details

#<<(j) ⇒ Object

enqueue a job



90
91
92
93
94
95
# File 'lib/tiny_work_service.rb', line 90

def <<(j)
  @jobs_enqueued += 1
  @jobs << j

  nil
end

#call(m) ⇒ Object

interface for TinyTCPService

Raises:

  • (TinyTCPService::BadClient)


70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/tiny_work_service.rb', line 70

def call(m)
  raise TinyTCPService::BadClient.new("nil message") if m.nil?

  case
  when m[0] == '+'        # add a job to the queue
    self << m[1..]
    'ok'                  # ok, job received
  when m[0] == '-'        # take a job from the queue
    shift || ''
  else
    raise TinyTCPService::BadClient.new("Client sent invalid message: `#{m[..50]}'")
  end
end

#joinObject

join the service Thread, if you want to wait until it’s done



85
86
87
# File 'lib/tiny_work_service.rb', line 85

def join
  @service.join
end

#shiftObject

return the first job in the work queue, if there is one present otherwise, return nil



99
100
101
102
103
104
105
# File 'lib/tiny_work_service.rb', line 99

def shift
  return nil if @jobs.empty?
  @jobs_dequeued += 1
  @jobs_to_track << Time.now.to_i

  @jobs.shift
end

#stop!Object



107
108
109
# File 'lib/tiny_work_service.rb', line 107

def stop!
  @service.stop!
end