Class: Computation

Inherits:
Object
  • Object
show all
Defined in:
lib/signalfx/signalflow/computation.rb

Overview

Represents a SignalFlow computation/job. A computation can have a channel associated with it, but not necessarily (it could have been detached while the computation is still running or never attached in the first place). New instances should only be created by the client and a Computation MUST have a handle.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handle, attach_func, stop_func) ⇒ Computation

Returns a new instance of Computation.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/signalfx/signalflow/computation.rb', line 23

def initialize(handle, attach_func, stop_func)
  @handle = handle
  @channel = nil
  @attach_func = attach_func
  @stop_func = stop_func
  @metadata = {}
  # We can't have a handle until the job is started so we must be at least
  # at this state
  @state = STARTED_STATE

  @pending_messages = Queue.new

  @batch_size_known = false
  @expected_batch_size = 0
  @current_batch_data = nil
  @current_batch_size = nil

  @last_timestamp_seen = nil
  @resolution = nil
  @input_timeseries_count = nil
end

Instance Attribute Details

#channelObject

Returns the value of attribute channel.



16
17
18
# File 'lib/signalfx/signalflow/computation.rb', line 16

def channel
  @channel
end

#handleObject

Returns the value of attribute handle.



15
16
17
# File 'lib/signalfx/signalflow/computation.rb', line 15

def handle
  @handle
end

#input_timeseries_countObject

Returns the value of attribute input_timeseries_count.



20
21
22
# File 'lib/signalfx/signalflow/computation.rb', line 20

def input_timeseries_count
  @input_timeseries_count
end

#last_timestamp_seenObject

Returns the value of attribute last_timestamp_seen.



21
22
23
# File 'lib/signalfx/signalflow/computation.rb', line 21

def last_timestamp_seen
  @last_timestamp_seen
end

#metadataObject

Returns the value of attribute metadata.



18
19
20
# File 'lib/signalfx/signalflow/computation.rb', line 18

def 
  @metadata
end

#resolutionObject

Returns the value of attribute resolution.



19
20
21
# File 'lib/signalfx/signalflow/computation.rb', line 19

def resolution
  @resolution
end

#stateObject

Returns the value of attribute state.



17
18
19
# File 'lib/signalfx/signalflow/computation.rb', line 17

def state
  @state
end

Instance Method Details

#attach(**options) ⇒ Computation

Attach to an already running computation.

*Not currently implemented on backend!*

channel attached to it.

Returns:

  • (Computation)

    This same computation instance with a now active



218
219
220
221
222
223
# File 'lib/signalfx/signalflow/computation.rb', line 218

def attach(**options)
  raise "Computation #{@handle} is already attached!" if @channel

  @channel = @attach_func.call(@handle, **options)
  self
end

#attached?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/signalfx/signalflow/computation.rb', line 49

def attached?
  !@channel.nil? && !@channel.detached
end

#detachObject

Detach from this computation and remove reference to the channel to free up memory.



227
228
229
230
# File 'lib/signalfx/signalflow/computation.rb', line 227

def detach
  @channel.detach
  @channel = nil
end

#each_message {|msg, comp| ... } ⇒ Object

Call the given block with each message in the channel as they arrive. This method will not return until the channel is detached from (either manually or due to the computation ending).

Messages are queued in the channel so that none will be lost if this method is not called immediately.

Yields:

  • (msg, comp)

    Called when a message arrives that is relevant to the channel’s computation. The ‘comp` param will be set to this computation instance for easy referencing of computation metadata and state. `comp` may be omitted if this reference to the computation is not needed.



96
97
98
99
100
101
102
103
104
105
# File 'lib/signalfx/signalflow/computation.rb', line 96

def each_message(&block)
  raise "Computation #{@handle} is not attached to a channel" unless @channel

  while @channel
    msg = next_message
    block.call(msg, self)
  end

  return
end

#each_message_async(&block) ⇒ Object

Iterates over the messages asynchronously for this computation. A convenience function if you want to fire off multiple computations simultaneously, though not terribly efficient since it starts a new thread that spends a lot of time waiting. However, since we don’t have a way of “select”ing on computations, this is probably good enough for basic use.

See #each_message.



78
79
80
81
82
83
# File 'lib/signalfx/signalflow/computation.rb', line 78

def each_message_async(&block)
  raise "Computation #{@handle} is not attached to a channel" unless @channel

  Thread.new{ each_message(&block) }
  return
end

#next_message(timeout_seconds = nil) ⇒ Object

Get the next message in this computation.

Parameters:

  • timeout_seconds (Float) (defaults to: nil)

    If a new message does not come within this interval, raises a ChannelTimeout exception. Note that this does not mean that this function will return within this interval since there may be messages received that are part of a larger batch. If nil, will block indefinitely.



60
61
62
63
64
65
66
67
68
69
# File 'lib/signalfx/signalflow/computation.rb', line 60

def next_message(timeout_seconds=nil)
  raise "Computation #{@handle} is not attached to a channel" unless @channel

  msg = nil
  while msg.nil? && !@channel.nil?
    # process_message might return no messages if it is building up a batch
    msg = process_message(@channel.pop(timeout_seconds))
  end
  return msg
end

#stop(reason = nil) ⇒ Object

Parameters:

  • reason (String) (defaults to: nil)

    Reason for stopping the computation.



237
238
239
# File 'lib/signalfx/signalflow/computation.rb', line 237

def stop(reason=nil)
  @stop_func.call(@handle, reason)
end