Method: Pmux::Handler#exec_streaming_task
- Defined in:
- lib/pmux/handler.rb
#exec_streaming_task(task) ⇒ Object
execute a task and return the result
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pmux/handler.rb', line 46 def exec_streaming_task task start_time = Time.now as = MR::AsyncResult.new if (task_keys = task['task_keys']) error_ids = [] fusion_id = task['task_id'] fiber = Fiber.new { for task_id, file in task_keys ntask = task.merge 'fusion_id'=>fusion_id, 'task_id'=>task_id, 'path'=>file result = do_one_task ntask, fiber end result.update :task_id=>fusion_id, :task_keys=>task_keys, :welapse=>(Time.now - start_time) as.result result } else fiber = Fiber.new { result = do_one_task(task, fiber) result[:welapse] = Time.now - start_time as.result result } end fiber.resume as end |