Class: Flor::WaitList

Inherits:
Object
  • Object
show all
Defined in:
lib/flor/unit/wlist.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

Regular waiters are message waiters, they wait for a message that matches a pattern

Row waiters are waiting for the pattern to realize in the database a better name would probably have been “query waiter”. Row waiters need their own thread for checking at interval. Row waiters can live in a different Ruby process from the Ruby process performing the executions.

Flor.env_i('FLOR_DEFAULT_TIMEOUT')

Instance Method Summary collapse

Constructor Details

#initialize(unit) ⇒ WaitList

Returns a new instance of WaitList.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/flor/unit/wlist.rb', line 26

def initialize(unit)

  @unit = unit
  @unit.hooker.add('wlist', self)

  @mutex = Mutex.new
  @msg_waiters = []
  @row_waiters = []

  @unit.instance_eval do
    def wait(exid, opts=true, more=nil)
      @hooker['wlist'].wait(exid, opts, more)
    end
  end

  @row_thread = nil
  @row_thread_status = nil
  @row_frequency = @unit.conf['wtl_row_frequency'] || 1
end

Instance Method Details

#notify(executor, message) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/flor/unit/wlist.rb', line 53

def notify(executor, message)

  return [] unless message['consumed']
  return [] if @msg_waiters.empty?

  @mutex.synchronize do

    to_remove =
      @msg_waiters.each_with_object([]) do |w, a|
        remove = w.notify(executor, message)
        a << w if remove
      end

    @msg_waiters -= to_remove
  end

  [] # no new messages
end

#shutdownObject



46
47
48
49
50
51
# File 'lib/flor/unit/wlist.rb', line 46

def shutdown

  @row_thread_status = :shutdown

  nil
end

#wait(exid, opts, more) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/flor/unit/wlist.rb', line 72

def wait(exid, opts, more)

  exid, opts =
    if opts == true && ! Flor.is_exid?(exid)
      [ nil, { wait: exid } ]
    elsif opts == true || opts.is_a?(String)
      [ exid, { wait: opts } ]
    else
      [ exid, opts ]
    end
  opts.merge!(more) if more.is_a?(Hash)

  opts[:timeout] =
    nil if opts[:timeout] == true
  opts[:timeout] ||=
    (DEFAULT_TIMEOUT || @unit.conf['wtl_default_timeout'] || 5)

  @mutex.synchronize do

    waiter = Waiter.new(exid, opts)

    fail ArgumentError.new(
      "unit is stopped, it cannot wait for #{[ exid, opts ].inspect}"
    ) if waiter.msg_waiter? && @unit.stopped?

    waiters = [ @msg_waiters, @row_waiters ]
    ts = [ 'msg', 'row' ]
    if waiter.row_waiter?; waiters.reverse!; ts.reverse!; end

    fail ArgumentError.new(
      "cannot add a #{ts[0]} waiter, since there are already #{ts[1]} ones"
    ) if waiters[1].any?

    waiters.first << waiter

    start_row_thread if @row_waiters.any?

    waiter

  end.wait
    # returns the response message
end