Class: ActionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/ActionManager.rb

Overview

This class provides support to handle actions. Class methods, or actions, can be registered in the action manager. The manager will wait for actions to be triggered (thread-safe), and will execute them concurrently. The action manager can be used to synchronize different objects in different threads

Example

class Sample
    attr_reader :am

    def initialize
        @am = ActionManager.new(15,true)

        @am.register_action("SLEEP",method("sleep_action"))
    end

    def sleep_action(secs)
        sleep(secs)
    end

    def finalize_action
        p "Exiting..."
        @am.stop_listener
    end
end

s = Sample.new

[email protected]_listener

# Objects in other threads can trigger actions like this # s.am.trigger_action(“SLEEP”,rand(3)+1) # s.am.trigger_action(“FINALIZE”)

Direct Known Subclasses

OpenNebulaDriver

Instance Method Summary collapse

Constructor Details

#initialize(concurrency = 10, threaded = true) ⇒ ActionManager

Creates a new Action Manager

concurrency is the maximun number of actions that can run at the same time threaded if true actions will be executed by default in a different thread



64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/ActionManager.rb', line 64

def initialize(concurrency=10, threaded=true)
    @finalize       = false
    @actions        = Hash.new
    @threaded       = threaded

    @concurrency    = concurrency
    @num_running    = 0

    @action_queue   = Array.new
    @action_running = Hash.new

    @threads_mutex  = Mutex.new
    @threads_cond   = ConditionVariable.new
end

Instance Method Details

#cancel_action(action_id) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/ActionManager.rb', line 140

def cancel_action(action_id)
    @threads_mutex.synchronize {
        action = @action_running[action_id]
        if action
            thread = action[:thread]
        else
            thread = nil
        end

        if thread
            thread.kill

            @num_running -= 1
            delete_running_action(action_id)

            @threads_cond.signal
        else
            i = @action_queue.select{|x| x[:id] == action_id}.first
            @action_queue.delete(i) if i
        end
    }
end

#register_action(aname, method, threaded = nil) ⇒ Object

Registers a new action in the manager. An action is defined by:

aname name of the action, it will identify the action method it’s invoked with call. It should be a Proc or Method object threaded execute the action in a new thread



84
85
86
87
88
89
90
91
# File 'lib/ActionManager.rb', line 84

def register_action(aname, method, threaded=nil)
    threaded ||= @threaded

    @actions[aname]={
        :method     => method,
        :threaded   => threaded
    }
end

#start_listenerObject



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/ActionManager.rb', line 163

def start_listener
    while true
        @threads_mutex.synchronize {
            while ((@concurrency - @num_running)==0) || empty_queue
                @threads_cond.wait(@threads_mutex)

                return if (@finalize && @num_running == 0)
            end

            run_action
        }
    end
end

#trigger_action(aname, action_id, *aargs) ⇒ Object

Triggers the execution of the action.

aname name of the action action_id an id to identify the action (to cancel it later) aargs arguments to call the action



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/ActionManager.rb', line 98

def trigger_action(aname, action_id, *aargs)

    @threads_mutex.synchronize {
        return if @finalize

        if aname == :FINALIZE
            finalize if respond_to?(:finalize)
            @finalize = true
            @threads_cond.signal if @num_running == 0
            return
        end

        if !@actions.has_key?(aname)
            return
        end

        arity=@actions[aname][:method].arity

        if arity < 0
            # Last parameter is an array
            arity = -arity - 1
            if arity > aargs.length
                # Message has not enough parameters
                return
            end
            # Converts last parameters to an array
            aargs[arity..-1]=[aargs[arity..-1]]
        else
            if arity != aargs.length
                return
            end
        end

        @action_queue << @actions[aname].merge(:args => aargs,
                :id => action_id)

        if @num_running < @concurrency
            @threads_cond.signal
        end
    }
end