Class: Fluent::NcmbInput

Inherits:
Input
  • Object
show all
Includes:
NCMB
Defined in:
lib/fluent/plugin/in_ncmb.rb

Constant Summary collapse

SORT_FIELD =
"createDate,objectId"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#last_posObject (readonly)

Returns the value of attribute last_pos.



24
25
26
# File 'lib/fluent/plugin/in_ncmb.rb', line 24

def last_pos
  @last_pos
end

#pos_entryObject (readonly)

Returns the value of attribute pos_entry.



23
24
25
# File 'lib/fluent/plugin/in_ncmb.rb', line 23

def pos_entry
  @pos_entry
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/in_ncmb.rb', line 26

def configure(conf)
  super

  raise Fluent::ConfigError.new("ConfigError: Please input tag") if @tag.nil?
  raise Fluent::ConfigError.new("ConfigError: Please input application_key") if @application_key.nil?
  raise Fluent::ConfigError.new("ConfigError: Please input client_key") if @client_key.nil?
  raise Fluent::ConfigError.new("ConfigError: Please input class_name") if @class_name.nil?
  raise Fluent::ConfigError.new("ConfigError: Please input pos_file_path") if @pos_file_path.nil?

  @ncmb_client = NCMB.initialize application_key: @application_key, client_key: @client_key
  @pos_entry = FilePositionEntry.new(@pos_file_path)
  @last_pos = @pos_entry.read_pos()

  @path = "/#{@api_version}/classes/#{@class_name}"

  unless @start_date.nil? then
    @start_date = DateTime.parse(@start_date)
  else
    @start_date = DateTime.now
  end
end

#create_where_query(conf_start_date) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/in_ncmb.rb', line 116

def create_where_query(conf_start_date)
  start_date = DateTime.parse(@last_pos[:date])
  if conf_start_date > start_date then
    start_date = conf_start_date
  end
  
  query = ""
  start_date_str = start_date.strftime("%FT%T.%LZ")
  query = "{\"createDate\": {\"$gte\": {\"__type\":\"Date\", \"iso\":\"#{start_date_str}\"}}}"

  return (query)
end

#load_recordsObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/in_ncmb.rb', line 75

def load_records()
  queries = {}
  queries[:limit] = @limit + 1
  queries[:order] = SORT_FIELD
  queries[:where] = create_where_query(@start_date)

  items = @ncmb_client.get @path, queries
  items = remove_emitted_record(items)

  if items.length > 0 then
    @last_pos[:date] = items[-1][:createDate]
    @last_pos[:id]   = items[-1][:objectId]
  end

  records = []
  if @field.nil? then
    records = items 
  else  
    items.each do |item|
      records << {@field.intern => item[@field.intern]}
    end
  end

  return (records)
end

#remove_emitted_record(items) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/in_ncmb.rb', line 101

def remove_emitted_record(items)
  result_items = []
  items[:results].each do |item|
    if item[:createDate] > @last_pos[:date] then
      result_items << item
    elsif item[:createDate] == @last_pos[:date] then
      if item[:objectId] > @last_pos[:id] then
        result_items << item
      end
    end
  end

  return result_items
end

#runObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/in_ncmb.rb', line 59

def run()
  loop {
    loop {
      records = load_records();
      if records.length == 0 then
        break;
      end

      time = Time.now.to_i;
      @router.emit(@tag, time, records)
    } 

    sleep(@interval * 60);
  }
end

#shutdownObject



54
55
56
57
# File 'lib/fluent/plugin/in_ncmb.rb', line 54

def shutdown()
  @pos_entry.update_pos(@last_pos)
  @ncmb_thread.terminate()
end

#startObject



48
49
50
51
52
# File 'lib/fluent/plugin/in_ncmb.rb', line 48

def start()
  super;
  
  @ncmb_thread = Thread.new(&method(:run))
end