Class: Computation
- Inherits:
-
Object
- Object
- Computation
- Defined in:
- lib/signalfx/signalflow/computation.rb
Overview
Represents a SignalFlow computation/job. A computation can have a channel associated with it, but not necessarily (it could have been detached while the computation is still running or never attached in the first place). New instances should only be created by the client and a Computation MUST have a handle.
Instance Attribute Summary collapse
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#handle ⇒ Object
Returns the value of attribute handle.
-
#input_timeseries_count ⇒ Object
Returns the value of attribute input_timeseries_count.
-
#last_timestamp_seen ⇒ Object
Returns the value of attribute last_timestamp_seen.
-
#metadata ⇒ Object
Returns the value of attribute metadata.
-
#resolution ⇒ Object
Returns the value of attribute resolution.
-
#state ⇒ Object
Returns the value of attribute state.
Instance Method Summary collapse
-
#attach(**options) ⇒ Computation
Attach to an already running computation.
- #attached? ⇒ Boolean
-
#detach ⇒ Object
Detach from this computation and remove reference to the channel to free up memory.
-
#each_message {|msg, comp| ... } ⇒ Object
Call the given block with each message in the channel as they arrive.
-
#each_message_async(&block) ⇒ Object
Iterates over the messages asynchronously for this computation.
-
#initialize(handle, attach_func, stop_func) ⇒ Computation
constructor
A new instance of Computation.
-
#next_message(timeout_seconds = nil) ⇒ Object
Get the next message in this computation.
-
#stop(reason = nil) ⇒ Object
Stop a computation.
Constructor Details
#initialize(handle, attach_func, stop_func) ⇒ Computation
Returns a new instance of Computation.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/signalfx/signalflow/computation.rb', line 23 def initialize(handle, attach_func, stop_func) @handle = handle @channel = nil @attach_func = attach_func @stop_func = stop_func @metadata = {} # We can't have a handle until the job is started so we must be at least # at this state @state = STARTED_STATE @pending_messages = Queue.new @batch_size_known = false @expected_batch_size = 0 @current_batch_data = nil @current_batch_size = nil @last_timestamp_seen = nil @resolution = nil @input_timeseries_count = nil end |
Instance Attribute Details
#channel ⇒ Object
Returns the value of attribute channel.
16 17 18 |
# File 'lib/signalfx/signalflow/computation.rb', line 16 def channel @channel end |
#handle ⇒ Object
Returns the value of attribute handle.
15 16 17 |
# File 'lib/signalfx/signalflow/computation.rb', line 15 def handle @handle end |
#input_timeseries_count ⇒ Object
Returns the value of attribute input_timeseries_count.
20 21 22 |
# File 'lib/signalfx/signalflow/computation.rb', line 20 def input_timeseries_count @input_timeseries_count end |
#last_timestamp_seen ⇒ Object
Returns the value of attribute last_timestamp_seen.
21 22 23 |
# File 'lib/signalfx/signalflow/computation.rb', line 21 def @last_timestamp_seen end |
#metadata ⇒ Object
Returns the value of attribute metadata.
18 19 20 |
# File 'lib/signalfx/signalflow/computation.rb', line 18 def @metadata end |
#resolution ⇒ Object
Returns the value of attribute resolution.
19 20 21 |
# File 'lib/signalfx/signalflow/computation.rb', line 19 def resolution @resolution end |
#state ⇒ Object
Returns the value of attribute state.
17 18 19 |
# File 'lib/signalfx/signalflow/computation.rb', line 17 def state @state end |
Instance Method Details
#attach(**options) ⇒ Computation
Attach to an already running computation.
*Not currently implemented on backend!*
channel attached to it.
218 219 220 221 222 223 |
# File 'lib/signalfx/signalflow/computation.rb', line 218 def attach(**) raise "Computation #{@handle} is already attached!" if @channel @channel = @attach_func.call(@handle, **) self end |
#attached? ⇒ Boolean
49 50 51 |
# File 'lib/signalfx/signalflow/computation.rb', line 49 def attached? !@channel.nil? && !@channel.detached end |
#detach ⇒ Object
Detach from this computation and remove reference to the channel to free up memory.
227 228 229 230 |
# File 'lib/signalfx/signalflow/computation.rb', line 227 def detach @channel.detach @channel = nil end |
#each_message {|msg, comp| ... } ⇒ Object
Call the given block with each message in the channel as they arrive. This method will not return until the channel is detached from (either manually or due to the computation ending).
Messages are queued in the channel so that none will be lost if this method is not called immediately.
96 97 98 99 100 101 102 103 104 105 |
# File 'lib/signalfx/signalflow/computation.rb', line 96 def (&block) raise "Computation #{@handle} is not attached to a channel" unless @channel while @channel msg = block.call(msg, self) end return end |
#each_message_async(&block) ⇒ Object
Iterates over the messages asynchronously for this computation. A convenience function if you want to fire off multiple computations simultaneously, though not terribly efficient since it starts a new thread that spends a lot of time waiting. However, since we don’t have a way of “select”ing on computations, this is probably good enough for basic use.
See #each_message.
78 79 80 81 82 83 |
# File 'lib/signalfx/signalflow/computation.rb', line 78 def (&block) raise "Computation #{@handle} is not attached to a channel" unless @channel Thread.new{ (&block) } return end |
#next_message(timeout_seconds = nil) ⇒ Object
Get the next message in this computation.
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/signalfx/signalflow/computation.rb', line 60 def (timeout_seconds=nil) raise "Computation #{@handle} is not attached to a channel" unless @channel msg = nil while msg.nil? && !@channel.nil? # process_message might return no messages if it is building up a batch msg = (@channel.pop(timeout_seconds)) end return msg end |
#stop(reason = nil) ⇒ Object
Stop a computation
See developers.signalfx.com/v2/reference#section-stop-a-computation
237 238 239 |
# File 'lib/signalfx/signalflow/computation.rb', line 237 def stop(reason=nil) @stop_func.call(@handle, reason) end |