Class: Flydata::Api::DataEntry

Inherits:
Base
  • Object
show all
Defined in:
lib/flydata/api/data_entry.rb

Constant Summary collapse

NUM_TABLES_PER_REQUEST =

Response example (/buffer_stat.json for a Sync data entry {

"complete": true,
"state": "complete",
"stat": {
  "buffer": {
    "proxy": {
      "buffer_size": 0,
      "queue_length": 0
    },
    "redshift": {
      "buffer_size": 0,
      "queue_length": 0
    },
    "sync_redshift": {
      "buffer_size": 0,
      "queue_length": 0
    }
  },
  "queue": {
    "copy_queue": {
      "num_items": 0
    },
    "query_queue": {
      "num_items": 0
    }
  }
},
"message": "",
"success": true

}

20

Instance Method Summary collapse

Methods inherited from Base

#create, #list

Constructor Details

#initialize(api_client) ⇒ DataEntry

Returns a new instance of DataEntry.



7
8
9
10
11
# File 'lib/flydata/api/data_entry.rb', line 7

def initialize(api_client)
  @model_name = 'data_entry'
  @url_path = "/data_ports/:data_port_id/#{@model_name.pluralize}"
  super
end

Instance Method Details

#buffer_stat(data_entry_id, options = {}) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/flydata/api/data_entry.rb', line 47

def buffer_stat(data_entry_id, options = {})
  table_array = options[:tables] || ['']
  table_array = [''] if table_array.empty?
  results = []
  error = false
  table_array.each_slice(NUM_TABLES_PER_REQUEST) do |ta|
    tables = ta.join(',')
    result = @client.post("/#{@model_name.pluralize}/#{data_entry_id}/buffer_stat/#{options[:mode]}", nil, {tables: tables})
    unless result["state"]
      error = result["message"]
      break
    end
    results << result
  end
  if error
    # mimicking the error value of /buffer_stat API
    {"complete"=> false,
     "message" => error,
     "success" => true,
    }
  else
    stat = results.inject({"buffer"=>{},"queue"=>{}}) do |h, result|
      %w[redshift sync_redshift].each do |fluent_process|
        %w[buffer_size queue_length].each do |stat|
          if result["stat"]["buffer"][fluent_process] && result["stat"]["buffer"][fluent_process][stat]
            h["buffer"][fluent_process] ||= {}
            h["buffer"][fluent_process][stat] ||= 0
            h["buffer"][fluent_process][stat] += result["stat"]["buffer"][fluent_process][stat]
          end
        end
      end
      %w[copy_queue query_queue].each do |queue|
        if result["stat"]["queue"][queue] && result["stat"]["queue"][queue]["num_items"]
          h["queue"][queue] ||= {}
          h["queue"][queue]["num_items"] ||= 0
          h["queue"][queue]["num_items"] += result["stat"]["queue"][queue]["num_items"]
        end
      end
      h
    end
    if proxy = results.first["stat"]["buffer"]["proxy"]
      # proxy values are not per-table
      stat["buffer"]["proxy"] ||= {}
      stat["buffer"]["proxy"]["buffer_size"] = proxy["buffer_size"]
      stat["buffer"]["proxy"]["queue_length"] = proxy["queue_length"]
    end
    # TODO Refactor.  The logic below is copied from web data_entries_controller
    buffer_empty = stat["buffer"].all?{|k,v| v["buffer_size"] == 0}
    queue_empty = stat["queue"].all?{|k,v| v["num_items"] == 0}

    complete = (buffer_empty && queue_empty)
    state =  if complete
               'complete'
             elsif !buffer_empty
               'processing'
             else
               'uploading'
             end
    message = results.count == 1 ? results.first["message"]
                                 : build_message(stat, complete)

    {
      "complete"=>complete,
      "state" => state,
      "stat" => stat,
      "message" => message,
      "success" => true,
    }
  end
end

#cleanup_sync(data_entry_id, tables, options = {}) ⇒ Object



161
162
163
164
# File 'lib/flydata/api/data_entry.rb', line 161

def cleanup_sync(data_entry_id, tables, options = {})
  params = options.merge({tables: tables.join(',')})
  @client.post("/#{@model_name.pluralize}/#{data_entry_id}/cleanup_sync", nil, params)
end

#complete_init_sync(data_entry_id, stats_hash) ⇒ Object

Tells the server that an initial sync has completed

stats_hash: {"init_sync_stats":{"Table1": 100, "Table2": 12345}}
  Sent 100 records for Table1, Sent 12345 records for Table2


193
194
195
# File 'lib/flydata/api/data_entry.rb', line 193

def complete_init_sync(data_entry_id, stats_hash)
  @client.post("/#{@model_name.pluralize}/#{data_entry_id}/complete_init_sync", {:headers => {:content_type => :json}}, stats_hash.to_json)
end

#table_status(data_entry_id, options = {}) ⇒ Object

Response example (/table_status.json) {

"table_status": [
  {
    "updated_at": 1462586673,
    "status": "ready",
    "table_name": "items",
    "seq": 0,
    "created_at": 1462586673,
    "src_pos": "-",
    "records_copied": "0",
    "num_items": 0,
    "next_item": false
  },
  {
    "updated_at": 1462586706,
    "status": "ready",
    "table_name": "users",
    "seq": 0,
    "created_at": 1462586674,
    "src_pos": "-",
    "records_copied": "0",
    "records_sent": "8",
    "num_items": 0,
      "next_item": false
  }
]

}



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/flydata/api/data_entry.rb', line 147

def table_status(data_entry_id, options = {})
  table_array = options[:tables] || ['']
  results = []
  table_array.each_slice(NUM_TABLES_PER_REQUEST) do |ta|
    tables = ta.join(',')
    result = @client.post("/#{@model_name.pluralize}/#{data_entry_id}/table_status", nil, {tables: tables})
    results << result
  end
  results.inject({"table_status"=>[], "success"=>true}) do |h, result|
    h["table_status"] += result["table_status"]
    h
  end
end

#update_table_validity(data_entry_id, table_update_hash) ⇒ Object

Update validity of tables – Params ( Agent 0.7.4 and older ) params =

updated_tables: { "bad_table"=>"error reason", "good_table"=>nil

} – Params ( Agent 0.7.5 and later ) table_update_hash = {

updated_tables: {
  "<table-name>" => {
    "invalid_table_reason" => "<error reason>",
    "uk_as_pk" => ['xx', ...],
  }
}

}

  • If value is nil, the setting will be deleted.

  • If table’s attribute hash doesn’t have an attribute key, the setting for the attribute for the table will not be changed



182
183
184
185
186
187
188
# File 'lib/flydata/api/data_entry.rb', line 182

def update_table_validity(data_entry_id, table_update_hash)
  slice_hash(table_update_hash[:updated_tables], NUM_TABLES_PER_REQUEST).each do |table_update_hash_sl|
    # re-construct sliced argument hash
    arg_hash = {updated_tables: table_update_hash_sl}
    @client.post("/#{@model_name.pluralize}/#{data_entry_id}/update_table_validity", {:headers => {:content_type => :json}}, arg_hash.to_json)
  end
end