Class: Lens::Worker
- Inherits:
-
Object
- Object
- Lens::Worker
- Defined in:
- lib/lens/worker.rb
Defined Under Namespace
Classes: Queue
Constant Summary collapse
- SHUTDOWN =
:__lens_worker_shutdown!
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Class Method Summary collapse
Instance Method Summary collapse
- #handle_response(response) ⇒ Object
-
#initialize(config) ⇒ Worker
constructor
A new instance of Worker.
- #notify_backend(payload) ⇒ Object
- #process(msg) ⇒ Object
- #push(obj) ⇒ Object
- #run ⇒ Object
-
#shutdown ⇒ Object
Shutdown the worker after sending remaining data.
-
#shutdown! ⇒ Object
Immediate shutdown.
- #start ⇒ Object
Constructor Details
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
43 44 45 |
# File 'lib/lens/worker.rb', line 43 def config @config end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
43 44 45 |
# File 'lib/lens/worker.rb', line 43 def mutex @mutex end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
43 44 45 |
# File 'lib/lens/worker.rb', line 43 def pid @pid end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
43 44 45 |
# File 'lib/lens/worker.rb', line 43 def queue @queue end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
43 44 45 |
# File 'lib/lens/worker.rb', line 43 def thread @thread end |
Class Method Details
.instance ⇒ Object
21 22 23 |
# File 'lib/lens/worker.rb', line 21 def instance @instance end |
.running? ⇒ Boolean
25 26 27 |
# File 'lib/lens/worker.rb', line 25 def running? !instance.nil? end |
.start(config) ⇒ Object
29 30 31 32 |
# File 'lib/lens/worker.rb', line 29 def start(config) return instance if running? @instance = new(config) end |
.stop(options = {}) ⇒ Object
34 35 36 37 38 |
# File 'lib/lens/worker.rb', line 34 def stop( = {}) @instance.public_send([:force] ? :shutdown! : :shutdown) if running? ensure @instance = nil end |
Instance Method Details
#handle_response(response) ⇒ Object
95 96 97 98 |
# File 'lib/lens/worker.rb', line 95 def handle_response(response) # TODO: send message back to queue if response status != 200 log "handle_response #{response.code}" end |
#notify_backend(payload) ⇒ Object
91 92 93 |
# File 'lib/lens/worker.rb', line 91 def notify_backend(payload) Lens.sender.send_to_lens(payload) end |
#process(msg) ⇒ Object
85 86 87 88 89 |
# File 'lib/lens/worker.rb', line 85 def process(msg) handle_response(notify_backend(msg)) rescue StandardError => e sleep(1) end |
#push(obj) ⇒ Object
53 54 55 56 57 |
# File 'lib/lens/worker.rb', line 53 def push(obj) if start queue.push(obj) end end |
#run ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/lens/worker.rb', line 71 def run begin loop do case msg = queue.pop when SHUTDOWN break else process(msg) end end end rescue Exception => e end |
#shutdown ⇒ Object
Shutdown the worker after sending remaining data. Returns true.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/lens/worker.rb', line 102 def shutdown mutex.synchronize do @shutdown = true @pid = nil queue.push(SHUTDOWN) end return true unless thread r = true unless Thread.current.eql?(thread) begin r = !!thread.join ensure shutdown! unless r end end r end |
#shutdown! ⇒ Object
Immediate shutdown
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/lens/worker.rb', line 124 def shutdown! mutex.synchronize do @shutdown = true @pid = nil end if thread Thread.kill(thread) thread.join # Allow ensure blocks to execute. end true end |
#start ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/lens/worker.rb', line 59 def start mutex.synchronize do return false if @shutdown return true if thread && thread.alive? @pid = Process.pid @thread = Thread.new { run } end true end |