Class: MongoAgent::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/mongo_agent/agent.rb

Overview

Author:

  • Darin London Copyright 2014

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attributes = nil) ⇒ Agent

create a new MongoAgent::Agent

Parameters:

  • attributes (Hash) (defaults to: nil)

    with name, queue, and optional sleep_between

Options Hash (attributes):

  • name (String)

    REQUIRED

  • queue (String)

    REQUIRED

  • sleep_between (Int)

    OPTIONAL

Raises:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/mongo_agent/agent.rb', line 74

def initialize(attributes = nil)
  if attributes.nil?
    raise MongoAgent::Error, "attributes Hash required with name and queue keys required"
  end
  @name = attributes[:name]
  @queue = attributes[:queue]
  unless @name && @queue
    raise MongoAgent::Error, "attributes[:name] and attributes[:queue] are required!"
  end
  build_db()
  if attributes[:sleep_between]
    @sleep_between = attributes[:sleep_between]
  else
    @sleep_between = 5
  end
  @log = {
     tasks_processed: 0,
     failed_tasks: 0
  }
  @process_while = ->(log) { true }
end

Instance Attribute Details

#dbMoped::Session (readonly)

This holds the Moped::Session object that can be used to query information from the MongoDB

hosted by the MONGO_HOST environment variable

Returns:

  • (Moped::Session)


38
39
40
# File 'lib/mongo_agent/agent.rb', line 38

def db
  @db
end

#logHash (readonly)

This holds the log while work! is running

log will be a Hash with the following keys:
  tasks_processed: int number of tasks processed (success of failure)
  failed_tasks: int number of tasks that have failed
The log is passed to the block that is assigned to process_while (see below)

Returns:

  • (Hash)


46
47
48
# File 'lib/mongo_agent/agent.rb', line 46

def log
  @log
end

#nameString

The name of the agent for which tasks will be taken from the queue

Returns:

  • (String)


58
59
60
# File 'lib/mongo_agent/agent.rb', line 58

def name
  @name
end

#process_whileObject

This holds a block that will be passed the log as an argument and return true

  as long as the agent should continue to process tasks when work! is called,
  and false when work! should stop and return.
  If not set, the agent will continue to process tasks until it is killed when
  work! is called
@return [Block]


54
55
56
# File 'lib/mongo_agent/agent.rb', line 54

def process_while
  @process_while
end

#queueString

The name of the task queue that contains the tasks on which this agent will work.

Returns:

  • (String)


62
63
64
# File 'lib/mongo_agent/agent.rb', line 62

def queue
  @queue
end

#sleep_betweenObject

number of seconds to sleep between each call to process! when running agent.work! or agent.process_while default 5



66
67
68
# File 'lib/mongo_agent/agent.rb', line 66

def sleep_between
  @sleep_between
end

Instance Method Details

#get_tasks(query = nil) ⇒ Moped::Query

get A MONGO_DB Moped::Query, either for the specified query Hash, or, when query is nil, all that are currently ready for the @name. This can be used to scan through the tasks on the @queue to perform aggregation tasks:

Examples:

collecting information

@agent->get_tasks({
   agent_name: @agent->name,
   error_encountered: true
}).each do |task|
  $stderr.puts "ERROR:\n#{ task.inspect }\n"
end

update ready to true for tasks that need intervention before they can run

@agent->get_tasks({
   agent_name: @agent->name,
   waiting_for_information: true
}).each do |task|
  task.update('$set' => {ready: true, waiting_form_information: false})
end

Parameters:

  • query (Hash) (defaults to: nil)

    (optional) any query to find tasks

Returns:

  • (Moped::Query)


223
224
225
226
227
228
229
# File 'lib/mongo_agent/agent.rb', line 223

def get_tasks(query = nil)
  if query.nil?
    return @db[@queue].find({agent_name: @name, ready: true})
  else
    return @db[@queue].find(query)
  end
end

#process!(&agent_code) {|Task| ... } ⇒ Object

If a task for the agent is found that is ready, process! registers itself with the task by setting ready to false, and setting its hostname on the :agent_host field, and then passes the task to the supplied block. This block must return a required boolean field indicating success or failure, and an optional hash of key - value fields that will be updated on the task Document. Note, the updates are made regardless of the value of success. In fact, the agent can be configured to update different fields based on success or failure. Also, note that any key, value supported by JSON can be stored in the hash. This allows the agent to communicate any useful information to the task for other agents (MongoAgent::Agent or human) to use. The block must try at all costs to avoid terminating. If an error is encountered, block should return false for the success field to signal that the process failed. If no errors are encountered block should return true for the success field.

Examples:

Exit successfully and sets :complete to true on the task

@agent->process! do |task_hash|
  foo = task_hash[:foo]
  # do something with foo to perform a task
  true
end

Same, but also sets the ‘files_processed’ field

@agent->process! { |task_hash|
  # ... operation using task_hash for information
  [true, {:files_processed => 30}]
}

Fails, sets :complete to true, and :error_encountered to true

@failure = ->(task_hash){
  begin
    # ... failing operation using task_hash for information
    return true
  rescue
   return false
  end
}

@agent->process!(&@failure)

Same, but also sets the ‘notice’ field

@agent->process! do |task_hash|
  ...
  [false, {:notice => 'There were 10 files left to process!' }]
end

This agent passes different parameters based on success or failure

$agent->process! { |task_hash|
  # ... process and set $success true or false
  if $success
    [ $success, {:files_processed => 100} ]
  else
    [ $success, {:files_remaining => 10}]
  end
}

Parameters:

  • agent_code (Block, Lambda, or Method)

    Code to process a task

Yield Parameters:

  • Task (Hash)

Yield Returns:

  • (Boolean, Hash)

    success, (optional) hash of fields to update and values to update on the task



153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/mongo_agent/agent.rb', line 153

def process!(&agent_code)
  (runnable, task) = register()
  return unless runnable
  (success, update) = agent_code.call(task)
  @log[:tasks_processed] += 1
  if success
    complete_task(task, update)
  else
    fail_task(task, update)
  end
  return
end

#work!(&agent_code) {|Task| ... } ⇒ Object

Iteratively runs process! on the supplied Block, then sleeps :sleep_between between each attempt. Block should match the specifications of what can be passed to process! (see above).

If @process_while is set to a Block, Lambda, or Method, then it is called after

each task is processed, and passed the current @log.  As long as the
Block returns true, work! will continue to process.  work! will stop processing
tasks when the Block returns false.

Examples:

process 3 entries and then exit

@agent.process_while = ->(log) {
  (log[:tasks_processed] < 3)
}
@agent.work! { |task_hash|
  #... do something with task_hash and return true of false just as in process!
}

process until errors are encountered and then exit

@agent.process_while = ->(log) {
  not(log[:errors_encountered])
}
@agent.work! { |task_hash|
  #... do something with task_hash and return true of false just as in process!
}
$stderr.puts " #{ @agent.log[:errors_encountered ] } errors were encountered during work."

Parameters:

  • agent_code (Block, Lambda, or Method)

    Code to process a task

Yield Parameters:

  • Task

    Hash

Yield Returns:

  • (Boolean, Hash)

    success, (optional) hash of fields to update and values to update on the task



194
195
196
197
198
199
200
# File 'lib/mongo_agent/agent.rb', line 194

def work!(&agent_code)

  while (@process_while.call(@log))
    process!(&agent_code)
    sleep @sleep_between
  end
end