Method: Pmux::Handler#exec_streaming_task

Defined in:
lib/pmux/handler.rb

#exec_streaming_task(task) ⇒ Object

execute a task and return the result



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pmux/handler.rb', line 46

def exec_streaming_task task
  start_time = Time.now
  as = MR::AsyncResult.new
  if (task_keys = task['task_keys'])
    error_ids = []
    fusion_id = task['task_id']
    fiber = Fiber.new {
      for task_id, file in task_keys
        ntask = task.merge 'fusion_id'=>fusion_id,
          'task_id'=>task_id, 'path'=>file
        result = do_one_task ntask, fiber
      end
      result.update :task_id=>fusion_id, :task_keys=>task_keys,
        :welapse=>(Time.now - start_time)
      as.result result
    }
  else
    fiber = Fiber.new {
      result = do_one_task(task, fiber)
      result[:welapse] = Time.now - start_time
      as.result result
    }
  end
  fiber.resume
  as
end