Class: RorVsWild::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rorvswild/queue.rb

Constant Summary collapse

SLEEP_TIME =
10
FLUSH_TRESHOLD =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Queue

Returns a new instance of Queue.


9
10
11
12
13
14
15
# File 'lib/rorvswild/queue.rb', line 9

def initialize(client)
  @jobs = []
  @requests = []
  @client = client
  @mutex = Mutex.new
  Kernel.at_exit { flush }
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client


6
7
8
# File 'lib/rorvswild/queue.rb', line 6

def client
  @client
end

#jobsObject (readonly)

Returns the value of attribute jobs


7
8
9
# File 'lib/rorvswild/queue.rb', line 7

def jobs
  @jobs
end

#mutexObject (readonly)

Returns the value of attribute mutex


6
7
8
# File 'lib/rorvswild/queue.rb', line 6

def mutex
  @mutex
end

#requestsObject (readonly)

Returns the value of attribute requests


7
8
9
# File 'lib/rorvswild/queue.rb', line 7

def requests
  @requests
end

#threadObject (readonly)

Returns the value of attribute thread


6
7
8
# File 'lib/rorvswild/queue.rb', line 6

def thread
  @thread
end

Instance Method Details

#flushObject


60
61
62
63
# File 'lib/rorvswild/queue.rb', line 60

def flush
  data = pull_jobs and client.post("/jobs", jobs: data)
  data = pull_requests and client.post("/requests", requests: data)
end

#flush_indefinetelyObject


53
54
55
56
57
58
# File 'lib/rorvswild/queue.rb', line 53

def flush_indefinetely
  sleep(SLEEP_TIME) and flush while true
rescue Exception => ex
  RorVsWild.logger.error(ex)
  raise
end

#pull_jobsObject


31
32
33
34
35
36
37
38
39
40
# File 'lib/rorvswild/queue.rb', line 31

def pull_jobs
  result = nil
  mutex.synchronize do
    if jobs.size > 0
      result = jobs
      @jobs = []
    end
  end
  result
end

#pull_requestsObject


42
43
44
45
46
47
48
49
50
51
# File 'lib/rorvswild/queue.rb', line 42

def pull_requests
  result = nil
  mutex.synchronize do
    if requests.size > 0
      result = requests
      @requests = []
    end
  end
  result
end

#push_job(data) ⇒ Object


17
18
19
# File 'lib/rorvswild/queue.rb', line 17

def push_job(data)
  push_to(jobs, data)
end

#push_request(data) ⇒ Object


21
22
23
# File 'lib/rorvswild/queue.rb', line 21

def push_request(data)
  push_to(requests, data)
end

#push_to(array, data) ⇒ Object


25
26
27
28
29
# File 'lib/rorvswild/queue.rb', line 25

def push_to(array, data)
  mutex.synchronize do
    wakeup_thread if array.push(data).size >= FLUSH_TRESHOLD || !thread
  end
end

#start_threadObject


65
66
67
68
# File 'lib/rorvswild/queue.rb', line 65

def start_thread
  RorVsWild.logger.debug("RorVsWild::Queue#start_thread".freeze)
  @thread = Thread.new { flush_indefinetely }
end

#wakeup_threadObject


70
71
72
# File 'lib/rorvswild/queue.rb', line 70

def wakeup_thread
  (thread && thread.alive?) ? thread.wakeup : start_thread
end