Class: RorVsWild::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rorvswild/queue.rb

Constant Summary collapse

SLEEP_TIME =
10
FLUSH_TRESHOLD =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
15
16
17
18
19
20
# File 'lib/rorvswild/queue.rb', line 11

def initialize(client)
  @jobs = []
  @requests = []
  @client = client
  @mutex = Mutex.new
  @metrics = RorVsWild::Metrics.new if defined?(Metrics)
  @request_sampling_rate = client.config[:request_sampling_rate]
  @job_sampling_rate = client.config[:job_sampling_rate]
  Kernel.at_exit { flush }
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



8
9
10
# File 'lib/rorvswild/queue.rb', line 8

def client
  @client
end

#jobsObject (readonly)

Returns the value of attribute jobs.



9
10
11
# File 'lib/rorvswild/queue.rb', line 9

def jobs
  @jobs
end

#mutexObject (readonly)

Returns the value of attribute mutex.



8
9
10
# File 'lib/rorvswild/queue.rb', line 8

def mutex
  @mutex
end

#requestsObject (readonly)

Returns the value of attribute requests.



9
10
11
# File 'lib/rorvswild/queue.rb', line 9

def requests
  @requests
end

#threadObject (readonly)

Returns the value of attribute thread.



8
9
10
# File 'lib/rorvswild/queue.rb', line 8

def thread
  @thread
end

Instance Method Details

#flushObject



74
75
76
77
78
# File 'lib/rorvswild/queue.rb', line 74

def flush
  data = pull_jobs and client.post("/jobs", jobs: data)
  data = pull_requests and client.post("/requests", requests: data)
  data = pull_server_metrics and client.post("/metrics", metrics: data)
end

#flush_indefinetelyObject



66
67
68
69
70
71
72
# File 'lib/rorvswild/queue.rb', line 66

def flush_indefinetely
  client.post("/deployments", deployment: Deployment.to_h)
  sleep(SLEEP_TIME) and flush while true
rescue Exception => ex
  RorVsWild.logger.error(ex)
  raise
end

#pull_jobsObject



40
41
42
43
44
45
46
47
48
49
# File 'lib/rorvswild/queue.rb', line 40

def pull_jobs
  result = nil
  mutex.synchronize do
    if jobs.size > 0
      result = jobs
      @jobs = []
    end
  end
  result
end

#pull_requestsObject



51
52
53
54
55
56
57
58
59
60
# File 'lib/rorvswild/queue.rb', line 51

def pull_requests
  result = nil
  mutex.synchronize do
    if requests.size > 0
      result = requests
      @requests = []
    end
  end
  result
end

#pull_server_metricsObject



62
63
64
# File 'lib/rorvswild/queue.rb', line 62

def pull_server_metrics
  @metrics && @metrics.update_every_minute && @metrics.to_h
end

#push_error(data) ⇒ Object



30
31
32
# File 'lib/rorvswild/queue.rb', line 30

def push_error(data)
  client.post_async("/errors", error: data)
end

#push_job(data) ⇒ Object



22
23
24
# File 'lib/rorvswild/queue.rb', line 22

def push_job(data)
  push_to(jobs, data) if !@job_sampling_rate || rand <= @job_sampling_rate
end

#push_request(data) ⇒ Object



26
27
28
# File 'lib/rorvswild/queue.rb', line 26

def push_request(data)
  push_to(requests, data) if !@request_sampling_rate || rand <= @request_sampling_rate
end

#push_to(array, data) ⇒ Object



34
35
36
37
38
# File 'lib/rorvswild/queue.rb', line 34

def push_to(array, data)
  mutex.synchronize do
    wakeup_thread if array.push(data).size >= FLUSH_TRESHOLD || !thread
  end
end

#start_threadObject



80
81
82
83
# File 'lib/rorvswild/queue.rb', line 80

def start_thread
  RorVsWild.logger.debug("RorVsWild::Queue#start_thread".freeze)
  @thread = Thread.new { flush_indefinetely }
end

#wakeup_threadObject



85
86
87
# File 'lib/rorvswild/queue.rb', line 85

def wakeup_thread
  (thread && thread.alive?) ? thread.wakeup : start_thread
end