Class: Orchestrator::Device::Processor
- Inherits:
-
Object
- Object
- Orchestrator::Device::Processor
- Includes:
- Transcoder
- Defined in:
- lib/orchestrator/device/processor.rb
Constant Summary collapse
- SEND_DEFAULTS =
{ wait: true, # wait for a response before continuing with sends delay: 0, # make sure sends are separated by at least this (in milliseconds) delay_on_receive: 0, # delay the next send by this (milliseconds) after a receive max_waits: 3, # number of times we will ignore valid tokens before retry retries: 2, # Retry attempts before we give up on the command hex_string: false, # Does the input need conversion timeout: 5000, # Time we will wait for a response priority: 50, # Priority of a send force_disconnect: false # Mainly for use with make and break # Other options include: # * emit callback to occur once command complete (may be discarded if a named command) # * on_receive (alternative to received function) }
- CONFIG_DEFAULTS =
{ tokenize: false, # If replaced with a callback can define custom tokenizers size_limit: 524288, # 512kb buffer max clear_queue_on_disconnect: false, flush_buffer_on_disconnect: false, priority_bonus: 20, # give commands bonus priority under certain conditions update_status: true, # auto update connected status? thrashing_threshold: 1500 # min milliseconds between connection retries # Other options include: # * inactivity_timeout (used with make and break) # * delimiter (string or regex to match message end) # * indicator (string or regex to match message start) # * verbose (throw errors or silently recover) # * wait_ready (wait for some signal before signaling connected) # * encoding (BINARY) (force encoding on incoming data) }
- SUCCESS =
Set.new([true, :success, :abort, nil, :ignore])
- FAILURE =
Set.new([false, :retry, :failed, :fail])
- DUMMY_RESOLVER =
proc {}
- TERMINATE_MSG =
Error::CommandCanceled.new 'command canceled due to module shutdown'
- UNNAMED =
'unnamed'
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#last_receive_at ⇒ Object
readonly
For statistics only.
-
#last_sent_at ⇒ Object
readonly
For statistics only.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#timeout ⇒ Object
readonly
For statistics only.
-
#transport ⇒ Object
Returns the value of attribute transport.
Instance Method Summary collapse
- #buffer(data) ⇒ Object
- #check_next ⇒ Object
-
#connected ⇒ Object
Callbacks ————————-.
- #disconnected ⇒ Object
-
#initialize(man) ⇒ Processor
constructor
init -> mod.load -> post_init So config can be set in on_load if desired.
-
#queue_command(options) ⇒ Object
Public interface.
-
#send_options(options) ⇒ Object
Helper functions ——————.
- #terminate ⇒ Object
Methods included from Transcoder
array_to_str, byte_to_hex, hex_to_byte, str_to_array
Constructor Details
#initialize(man) ⇒ Processor
init -> mod.load -> post_init So config can be set in on_load if desired
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/orchestrator/device/processor.rb', line 74 def initialize(man) @man = man @thread = @man.thread @logger = @man.logger @defaults = SEND_DEFAULTS.dup @config = CONFIG_DEFAULTS.dup @queue = CommandQueue.new(@thread, method(:send_next)) @responses = [] @wait = false @connected = false @checking = Mutex.new @bonus = 0 @last_sent_at = 0 @last_receive_at = 0 # Used to indicate when we can start the next response processing @head = ::Libuv::Q::ResolvedPromise.new(@thread, true) @tail = ::Libuv::Q::ResolvedPromise.new(@thread, true) # Method variables @resolver = proc { |resp| @thread.schedule { resolve_callback(resp) } } @resp_success = proc { |result| @thread.next_tick { resp_success(result) } } @resp_failure = proc { |reason| @thread.next_tick { resp_failure(reason) } } end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
65 66 67 |
# File 'lib/orchestrator/device/processor.rb', line 65 def config @config end |
#last_receive_at ⇒ Object (readonly)
For statistics only
69 70 71 |
# File 'lib/orchestrator/device/processor.rb', line 69 def last_receive_at @last_receive_at end |
#last_sent_at ⇒ Object (readonly)
For statistics only
69 70 71 |
# File 'lib/orchestrator/device/processor.rb', line 69 def last_sent_at @last_sent_at end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
65 66 67 |
# File 'lib/orchestrator/device/processor.rb', line 65 def queue @queue end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
65 66 67 |
# File 'lib/orchestrator/device/processor.rb', line 65 def thread @thread end |
#timeout ⇒ Object (readonly)
For statistics only
69 70 71 |
# File 'lib/orchestrator/device/processor.rb', line 69 def timeout @timeout end |
#transport ⇒ Object
Returns the value of attribute transport.
66 67 68 |
# File 'lib/orchestrator/device/processor.rb', line 66 def transport @transport end |
Instance Method Details
#buffer(data) ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/orchestrator/device/processor.rb', line 172 def buffer(data) @last_receive_at = @thread.now if @buffer @responses.concat @buffer.extract(data) else # tokenizing buffer above will enforce encoding if @config[:encoding] data.force_encoding(@config[:encoding]) end @responses << data end # if we are waiting we don't want to process this data just yet if !@wait check_next end end |
#check_next ⇒ Object
195 196 197 198 199 200 201 202 203 |
# File 'lib/orchestrator/device/processor.rb', line 195 def check_next return if @checking.locked? || @responses.length <= 0 @checking.synchronize { loop do check_data(@responses.shift) break if @wait || @responses.length == 0 end } end |
#connected ⇒ Object
Callbacks ————————-
147 148 149 150 151 152 153 154 |
# File 'lib/orchestrator/device/processor.rb', line 147 def connected @connected = true new_buffer @man.notify_connected if @config[:update_status] @man.trak(:connected, true) end end |
#disconnected ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/orchestrator/device/processor.rb', line 156 def disconnected @connected = false @man.notify_disconnected if @config[:update_status] @man.trak(:connected, false) end if @buffer && @config[:flush_buffer_on_disconnect] check_data(@buffer.flush) end @buffer = nil if @queue.waiting resp_failure(:disconnected) end end |
#queue_command(options) ⇒ Object
Public interface
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/orchestrator/device/processor.rb', line 118 def queue_command() # Make sure we are sending appropriately formatted data raw = [:data] if raw.is_a?(Array) [:data] = array_to_str(raw) elsif [:hex_string] == true [:data] = hex_to_byte(raw) end data = [:data] [:retries] = 0 if [:wait] == false if [:name].is_a? String [:name] = [:name].to_sym end # merge in the defaults = @defaults.merge() @queue.push(, [:priority] + @bonus) rescue => e [:defer].reject(e) @logger.print_error(e, 'error queuing command') end |
#send_options(options) ⇒ Object
Helper functions ——————
106 107 108 |
# File 'lib/orchestrator/device/processor.rb', line 106 def () @defaults.merge!() end |
#terminate ⇒ Object
191 192 193 |
# File 'lib/orchestrator/device/processor.rb', line 191 def terminate @thread.schedule method(:do_terminate) end |