Class: OpenC3::StoreQueued
- Defined in:
- lib/openc3/utilities/store_queued.rb
Direct Known Subclasses
Constant Summary collapse
- @@instance_mutex =
Mutex used to ensure that only one instance is created
Mutex.new
Class Method Summary collapse
-
.instance(update_interval = 1) ⇒ Object
Get the singleton instance Sets the update interval to 1 second by default.
-
.method_missing(message, *args, **kwargs) ⇒ Object
Delegate all unknown class methods to delegate to the instance.
Instance Method Summary collapse
- #graceful_kill ⇒ Object
-
#initialize(update_interval) ⇒ StoreQueued
constructor
A new instance of StoreQueued.
-
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Record the message for pipelining by the thread.
- #shutdown ⇒ Object
-
#store_instance ⇒ Object
Returns the store we’re working with.
- #store_thread_body ⇒ Object
Constructor Details
#initialize(update_interval) ⇒ StoreQueued
Returns a new instance of StoreQueued.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/openc3/utilities/store_queued.rb', line 45 def initialize(update_interval) @update_interval = update_interval @store = store_instance() # Queue to hold the store requests @store_queue = Queue.new # Sleeper used to delay update thread @update_sleeper = Sleeper.new at_exit() do shutdown() end # Thread used to call methods on the store @update_thread = OpenC3.safe_thread(self.class.to_s) do store_thread_body() end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Record the message for pipelining by the thread
102 103 104 105 106 107 108 109 |
# File 'lib/openc3/utilities/store_queued.rb', line 102 def method_missing(, *args, **kwargs, &block) o = OpenStruct.new o. = o.args = args o.kwargs = kwargs o.block = block @store_queue.push(o) end |
Class Method Details
.instance(update_interval = 1) ⇒ Object
Get the singleton instance Sets the update interval to 1 second by default
31 32 33 34 35 36 37 38 |
# File 'lib/openc3/utilities/store_queued.rb', line 31 def self.instance(update_interval = 1) # seconds return @instance if @instance @@instance_mutex.synchronize do @instance ||= self.new(update_interval) return @instance end end |
.method_missing(message, *args, **kwargs) ⇒ Object
Delegate all unknown class methods to delegate to the instance
41 42 43 |
# File 'lib/openc3/utilities/store_queued.rb', line 41 def self.method_missing(, *args, **kwargs, &) self.instance.public_send(, *args, **kwargs, &) end |
Instance Method Details
#graceful_kill ⇒ Object
116 117 118 |
# File 'lib/openc3/utilities/store_queued.rb', line 116 def graceful_kill # Do nothing end |
#shutdown ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/openc3/utilities/store_queued.rb', line 85 def shutdown @update_sleeper.cancel if @update_sleeper OpenC3.kill_thread(self, @update_thread) if @update_thread @update_thread = nil # Drain the queue before shutdown unless @store_queue.empty? # Pipeline the requests to redis to improve performance @store.pipelined do while !@store_queue.empty? action = @store_queue.pop() @store.method_missing(action., *action.args, **action.kwargs, &action.block) end end end end |
#store_instance ⇒ Object
Returns the store we’re working with
112 113 114 |
# File 'lib/openc3/utilities/store_queued.rb', line 112 def store_instance Store.instance end |
#store_thread_body ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/openc3/utilities/store_queued.rb', line 63 def store_thread_body while true start_time = Time.now unless @store_queue.empty? # Pipeline the requests to redis to improve performance @store.pipelined do while !@store_queue.empty? action = @store_queue.pop() @store.method_missing(action., *action.args, **action.kwargs, &action.block) end end end # Only check whether to update at a set interval run_time = Time.now - start_time sleep_time = @update_interval - run_time sleep_time = 0 if sleep_time < 0 break if @update_sleeper.sleep(sleep_time) end end |