Class: Fluent::NcmbInput
- Inherits:
-
Input
- Object
- Input
- Fluent::NcmbInput
- Includes:
- NCMB
- Defined in:
- lib/fluent/plugin/in_ncmb.rb
Constant Summary collapse
- SORT_FIELD =
"createDate,objectId"
Instance Attribute Summary collapse
-
#last_pos ⇒ Object
readonly
Returns the value of attribute last_pos.
-
#pos_entry ⇒ Object
readonly
Returns the value of attribute pos_entry.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #create_where_query(conf_start_date) ⇒ Object
- #load_records ⇒ Object
- #remove_emitted_record(items) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Attribute Details
#last_pos ⇒ Object (readonly)
Returns the value of attribute last_pos.
24 25 26 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 24 def last_pos @last_pos end |
#pos_entry ⇒ Object (readonly)
Returns the value of attribute pos_entry.
23 24 25 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 23 def pos_entry @pos_entry end |
Instance Method Details
#configure(conf) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 26 def configure(conf) super raise Fluent::ConfigError.new("ConfigError: Please input tag") if @tag.nil? raise Fluent::ConfigError.new("ConfigError: Please input application_key") if @application_key.nil? raise Fluent::ConfigError.new("ConfigError: Please input client_key") if @client_key.nil? raise Fluent::ConfigError.new("ConfigError: Please input class_name") if @class_name.nil? raise Fluent::ConfigError.new("ConfigError: Please input pos_file_path") if @pos_file_path.nil? @ncmb_client = NCMB.initialize application_key: @application_key, client_key: @client_key @pos_entry = FilePositionEntry.new(@pos_file_path) @last_pos = @pos_entry.read_pos() @path = "/#{@api_version}/classes/#{@class_name}" unless @start_date.nil? then @start_date = DateTime.parse(@start_date) else @start_date = DateTime.now end end |
#create_where_query(conf_start_date) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 116 def create_where_query(conf_start_date) start_date = DateTime.parse(@last_pos[:date]) if conf_start_date > start_date then start_date = conf_start_date end query = "" start_date_str = start_date.strftime("%FT%T.%LZ") query = "{\"createDate\": {\"$gte\": {\"__type\":\"Date\", \"iso\":\"#{start_date_str}\"}}}" return (query) end |
#load_records ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 75 def load_records() queries = {} queries[:limit] = @limit + 1 queries[:order] = SORT_FIELD queries[:where] = create_where_query(@start_date) items = @ncmb_client.get @path, queries items = remove_emitted_record(items) if items.length > 0 then @last_pos[:date] = items[-1][:createDate] @last_pos[:id] = items[-1][:objectId] end records = [] if @field.nil? then records = items else items.each do |item| records << {@field.intern => item[@field.intern]} end end return (records) end |
#remove_emitted_record(items) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 101 def remove_emitted_record(items) result_items = [] items[:results].each do |item| if item[:createDate] > @last_pos[:date] then result_items << item elsif item[:createDate] == @last_pos[:date] then if item[:objectId] > @last_pos[:id] then result_items << item end end end return result_items end |
#run ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 59 def run() loop { loop { records = load_records(); if records.length == 0 then break; end time = Time.now.to_i; @router.emit(@tag, time, records) } sleep(@interval * 60); } end |
#shutdown ⇒ Object
54 55 56 57 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 54 def shutdown() @pos_entry.update_pos(@last_pos) @ncmb_thread.terminate() end |
#start ⇒ Object
48 49 50 51 52 |
# File 'lib/fluent/plugin/in_ncmb.rb', line 48 def start() super; @ncmb_thread = Thread.new(&method(:run)) end |