Module: Simultaneous::ClassMethods

Included in:
Simultaneous
Defined in:
lib/simultaneous.rb

Constant Summary collapse

TCP_CONNECTION_MATCH =
%r{^([^/]+):(\d+)}

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object (protected)

Catch method missing to enable launching of tasks by direct name e.g.

Simultaneous.add_task(:process_things, "/usr/bin/process")

launch this task:

Simultaneous.process_things


254
255
256
257
258
259
260
# File 'lib/simultaneous.rb', line 254

def method_missing(method, *args, &block)
  if tasks.key?(method)
    fire(method, *args, &block)
  else
    super
  end
end

Instance Method Details

#[](task_name) ⇒ Object

Gets the TaskDescription of a task

Parameters:

  • task_name (Symbol)

    the name of the task to get



124
125
126
# File 'lib/simultaneous.rb', line 124

def [](task_name)
  tasks[task_name]
end

#add_task(task_name, path_to_binary, options = {}, default_params = {}, env = {}) ⇒ Object

Registers a task and makes it available for easy launching using #fire

Parameters:

  • task_name (Symbol)

    the name for the task. This should be unique

  • path_to_binary (String)

    the path to the executable that should be run when this task is launched

  • options (Hash) (defaults to: {})

    A hash of options for the task. Available options are:

    :niceness: the niceness value of the process, >=0
    :logfile:  the location of the processes log file to which all io will be redirected
    :pwd:      directory that the task should work in
    
  • niceness (Fixnum)

    the niceness value of the process >= 0. The higher this value the ‘nicer’ the launched process will be (a high nice value results in a low priority task). On UNIX systems the max, nicest, value is 20

  • default_params (Hash) (defaults to: {})

    A Hash of parameters that should be passed to every invocation of the task. These will be converted to command line parameters

    { "setting" => "value", "output" => "destination"}
    

    gives the parameters

    --setting=value --output=destination
    

    @see Simultaneous::Utilities#to_arguments

  • env (Hash) (defaults to: {})

    A Hash of values to add to the task’s ENV settings



57
58
59
# File 'lib/simultaneous.rb', line 57

def add_task(task_name, path_to_binary, options={}, default_params={}, env={})
  tasks[task_name] = TaskDescription.new(task_name, path_to_binary, options, default_params, env)
end

#binary(task_name) ⇒ String

Returns the path to the binary for the given task

Parameters:

  • task_name (Symbol)

    the name of the task

Returns:

  • (String)

    the path of the task’s binary



117
118
119
# File 'lib/simultaneous.rb', line 117

def binary(task_name)
  tasks[task_name].binary
end

#clientObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/simultaneous.rb', line 76

def client
  @client ||= \
    begin
      client = \
        if client_mode == :async and ::EM.reactor_running?
          AsyncClient.new(domain, connection)
        else
          SyncClient.new(domain, connection)
        end
      # make sure that new client is hooked into all listeners
      event_listeners.each do |event, blocks|
        blocks.each do |block|
          client.on_event(event, &block)
        end
      end
      client
    end
end

#client=(client) ⇒ Object



71
72
73
74
# File 'lib/simultaneous.rb', line 71

def client=(client)
  @client.close if @client
  @client = client
end

#client_connection(connection_string) ⇒ Object



238
239
240
241
242
243
244
# File 'lib/simultaneous.rb', line 238

def client_connection(connection_string)
  if connection_string =~ TCP_CONNECTION_MATCH
    TCPSocket.new($1, $2.to_i)
  else
    UNIXSocket.new(connection_string)
  end
end

#client_modeObject



154
155
156
157
# File 'lib/simultaneous.rb', line 154

def client_mode
  reset_client!
  @client_mode ||= :async
end

#client_mode=(mode) ⇒ Object



142
143
144
# File 'lib/simultaneous.rb', line 142

def client_mode=(mode)
  @client_mode = mode
end

#connectionObject



146
147
148
# File 'lib/simultaneous.rb', line 146

def connection
  @connection ||= (ENV[Simultaneous::ENV_CONNECTION] || Simultaneous::DEFAULT_CONNECTION)
end

#connection=(connection) ⇒ Object



132
133
134
135
# File 'lib/simultaneous.rb', line 132

def connection=(connection)
  reset_client!
  @connection = connection
end

#domainObject



150
151
152
# File 'lib/simultaneous.rb', line 150

def domain
  @domain ||= (ENV[Simultaneous::ENV_DOMAIN] || "domain#{$$}")
end

#domain=(domain) ⇒ Object



137
138
139
140
# File 'lib/simultaneous.rb', line 137

def domain=(domain)
  reset_client!
  @domain = domain
end

#event_listenersObject



95
96
97
# File 'lib/simultaneous.rb', line 95

def event_listeners
  @event_listeners ||= Hash.new { |hash, key| hash[key] = [] }
end

#fire(task_name, params = {}) ⇒ Object

Launches the given task

Parameters:

  • task_name (Symbol)

    the name of the task to launch

  • params (Hash) (defaults to: {})

    parameters to pass to the executable



65
66
67
68
69
# File 'lib/simultaneous.rb', line 65

def fire(task_name, params={})
  task = tasks[task_name]
  command = Command::Fire.new(task, params)
  client.run(command)
end

#int(task_name) ⇒ Object

Sends a running task the INT signal



178
179
180
# File 'lib/simultaneous.rb', line 178

def int(task_name)
  kill(task_name, "INT")
end

#kill(task_name, signal = "TERM") ⇒ Object

Sends a running task an arbitrary signal

Parameters:

  • task_name (Symbol)

    the name of the task to send the signal

  • signal (String) (defaults to: "TERM")

    the signal to send

See Also:

  • for a full list of signals available


188
189
190
191
# File 'lib/simultaneous.rb', line 188

def kill(task_name, signal="TERM")
  command = Command::Kill.new(task_name, signal)
  client.run(command)
end

#map_pid(task_name, pid) ⇒ Object Also known as: set_pid

Used by the Daemon module to set the correct PID for a given task



160
161
162
163
# File 'lib/simultaneous.rb', line 160

def map_pid(task_name, pid)
  command = Command::SetPid.new(task_name, pid)
  client.run(command)
end

#on_event(event, &block) ⇒ Object



99
100
101
102
# File 'lib/simultaneous.rb', line 99

def on_event(event, &block)
  event_listeners[event] << block
  client.on_event(event, &block) if client
end

#parse_connection(connection_string) ⇒ Object

Convert connection string into an argument array suitable for passing to EM.connect or EM.server e.g.

"/path/to/socket.sock" #=> ["/path/to/socket.sock"]
"localhost:9999" #=> ["localhost", 9999]


230
231
232
233
234
235
236
# File 'lib/simultaneous.rb', line 230

def parse_connection(connection_string)
  if connection_string =~ TCP_CONNECTION_MATCH
    [$1, $2.to_i]
  else
    [connection_string]
  end
end

#reset!Object



108
109
110
111
# File 'lib/simultaneous.rb', line 108

def reset!
  reset_client!
  @tasks = nil
end

#reset_client!Object



104
105
106
# File 'lib/simultaneous.rb', line 104

def reset_client!
  @client = nil
end

#send_event(event, data) ⇒ Object



167
168
169
170
# File 'lib/simultaneous.rb', line 167

def send_event(event, data)
  command = Command::ClientEvent.new(domain, event, data)
  client.run(command)
end

#server_binaryObject



23
24
25
# File 'lib/simultaneous.rb', line 23

def server_binary
  File.expand_path("../../bin/simultaneous-server", __FILE__)
end

#task_complete(task_name) ⇒ Object



193
194
195
196
# File 'lib/simultaneous.rb', line 193

def task_complete(task_name)
  command = Command::TaskComplete.new(task_name)
  client.run(command)
end

#tasksObject



128
129
130
# File 'lib/simultaneous.rb', line 128

def tasks
  @tasks ||= {}
end

#term(task_name) ⇒ Object

Sends a running task the TERM signal



173
174
175
# File 'lib/simultaneous.rb', line 173

def term(task_name)
  kill(task_name, "TERM")
end

#to_arguments(params = {}) ⇒ Object



198
199
200
201
202
# File 'lib/simultaneous.rb', line 198

def to_arguments(params={})
  params.keys.sort { |a, b| a.to_s <=> b.to_s }.map do |key|
    %(--#{key}=#{to_parameter(params[key])})
  end.join(" ")
end

#to_parameter(obj) ⇒ Object

Maps objects to command line parameters suitable for parsing by Thor



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/simultaneous.rb', line 206

def to_parameter(obj)
  case obj
  when String
    obj.inspect
  when Array
    obj.map { |o| to_parameter(o) }.join(' ')
  when Hash
    obj.map do |k, v|
      "#{k}:#{to_parameter(obj[k])}"
    end.join(' ')
  when Numeric
    obj
  else
    to_parameter(obj.to_s)
  end
end