Class: Aerospike::ExecuteTask

Inherits:
Task
  • Object
show all
Defined in:
lib/aerospike/task/execute_task.rb

Overview

ExecuteTask is used to poll for long running server execute job completion.

Instance Method Summary collapse

Methods inherited from Task

#completed?, #wait_till_completed

Constructor Details

#initialize(cluster, statement) ⇒ ExecuteTask

NewExecuteTask initializes task with fields needed to query server nodes.



23
24
25
26
27
28
29
30
# File 'lib/aerospike/task/execute_task.rb', line 23

def initialize(cluster, statement)
  super(cluster, false)

  @task_id = statement.task_id
  @scan = statement.is_scan?

  self
end

Instance Method Details

#all_nodes_done?Boolean

IsDone queries all nodes for task completion status.

Returns:

  • (Boolean)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/aerospike/task/execute_task.rb', line 33

def all_nodes_done?

  if @scan
    command = 'scan-list'
  else
    command = 'query-list'
  end

  nodes = @cluster.nodes
  done = false

  nodes.each do |node|
    conn = node.get_connection(0)
    responseMap, _ = Info.request(conn, command)
    node.put_connection(conn)

    response = responseMap[command]
    find = "job_id=#{@task_id}:"
    index = response.index(find)

    unless index
      # don't return on first check
      done = true
      next
    end

    b = index + find.length
    response = response[b, response.length]
    find = 'job_status='
    index = response.index(find)

    next unless index

    b = index + find.length
    response = response[b, response.length]
    e = response.index(':')
    status = response[0, e]

    case status
    when 'ABORTED'
      raise raise Aerospike::Exceptions::QueryTerminated
    when 'IN PROGRESS'
      return false
    when 'DONE'
      done = true
    end
  end

  done
end