Class: RorVsWild::Queue
- Inherits:
-
Object
- Object
- RorVsWild::Queue
- Defined in:
- lib/rorvswild/queue.rb
Constant Summary collapse
- SLEEP_TIME =
10- FLUSH_TRESHOLD =
10
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#requests ⇒ Object
readonly
Returns the value of attribute requests.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Instance Method Summary collapse
- #flush ⇒ Object
- #flush_indefinetely ⇒ Object
-
#initialize(client) ⇒ Queue
constructor
A new instance of Queue.
- #pull_jobs ⇒ Object
- #pull_requests ⇒ Object
- #pull_server_metrics ⇒ Object
- #push_error(data) ⇒ Object
- #push_job(data) ⇒ Object
- #push_request(data) ⇒ Object
- #push_to(array, data) ⇒ Object
- #start_thread ⇒ Object
- #wakeup_thread ⇒ Object
Constructor Details
#initialize(client) ⇒ Queue
Returns a new instance of Queue.
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/rorvswild/queue.rb', line 11 def initialize(client) @jobs = [] @requests = [] @client = client @mutex = Mutex.new @metrics = RorVsWild::Metrics.new if defined?(Metrics) @request_sampling_rate = client.config[:request_sampling_rate] @job_sampling_rate = client.config[:job_sampling_rate] Kernel.at_exit { flush } end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
8 9 10 |
# File 'lib/rorvswild/queue.rb', line 8 def client @client end |
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
9 10 11 |
# File 'lib/rorvswild/queue.rb', line 9 def jobs @jobs end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
8 9 10 |
# File 'lib/rorvswild/queue.rb', line 8 def mutex @mutex end |
#requests ⇒ Object (readonly)
Returns the value of attribute requests.
9 10 11 |
# File 'lib/rorvswild/queue.rb', line 9 def requests @requests end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
8 9 10 |
# File 'lib/rorvswild/queue.rb', line 8 def thread @thread end |
Instance Method Details
#flush ⇒ Object
74 75 76 77 78 |
# File 'lib/rorvswild/queue.rb', line 74 def flush data = pull_jobs and client.post("/jobs", jobs: data) data = pull_requests and client.post("/requests", requests: data) data = pull_server_metrics and client.post("/metrics", metrics: data) end |
#flush_indefinetely ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/rorvswild/queue.rb', line 66 def flush_indefinetely client.post("/deployments", deployment: Deployment.to_h) sleep(SLEEP_TIME) and flush while true rescue Exception => ex RorVsWild.logger.error(ex) raise end |
#pull_jobs ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rorvswild/queue.rb', line 40 def pull_jobs result = nil mutex.synchronize do if jobs.size > 0 result = jobs @jobs = [] end end result end |
#pull_requests ⇒ Object
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/rorvswild/queue.rb', line 51 def pull_requests result = nil mutex.synchronize do if requests.size > 0 result = requests @requests = [] end end result end |
#pull_server_metrics ⇒ Object
62 63 64 |
# File 'lib/rorvswild/queue.rb', line 62 def pull_server_metrics @metrics && @metrics.update_every_minute && @metrics.to_h end |
#push_error(data) ⇒ Object
30 31 32 |
# File 'lib/rorvswild/queue.rb', line 30 def push_error(data) client.post_async("/errors", error: data) end |
#push_job(data) ⇒ Object
22 23 24 |
# File 'lib/rorvswild/queue.rb', line 22 def push_job(data) push_to(jobs, data) if !@job_sampling_rate || rand <= @job_sampling_rate end |
#push_request(data) ⇒ Object
26 27 28 |
# File 'lib/rorvswild/queue.rb', line 26 def push_request(data) push_to(requests, data) if !@request_sampling_rate || rand <= @request_sampling_rate end |
#push_to(array, data) ⇒ Object
34 35 36 37 38 |
# File 'lib/rorvswild/queue.rb', line 34 def push_to(array, data) mutex.synchronize do wakeup_thread if array.push(data).size >= FLUSH_TRESHOLD || !thread end end |
#start_thread ⇒ Object
80 81 82 83 |
# File 'lib/rorvswild/queue.rb', line 80 def start_thread RorVsWild.logger.debug("RorVsWild::Queue#start_thread".freeze) @thread = Thread.new { flush_indefinetely } end |
#wakeup_thread ⇒ Object
85 86 87 |
# File 'lib/rorvswild/queue.rb', line 85 def wakeup_thread (thread && thread.alive?) ? thread.wakeup : start_thread end |