Class: Fluent::FeedlyInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_feedly.rb

Defined Under Namespace

Classes: StateStore

Instance Method Summary collapse

Constructor Details

#initializeFeedlyInput

Returns a new instance of FeedlyInput.



22
23
24
25
26
27
# File 'lib/fluent/plugin/in_feedly.rb', line 22

def initialize
  require 'feedlr'
  require 'digest/sha2'

  super
end

Instance Method Details

#configure(conf) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/in_feedly.rb', line 29

def configure(conf)
  super

  if not @fetch_count >= 20 && @fetch_count <= 10000
    raise Fluent::ConfigError, "Feedly: fetch_count param (#{@fetch_count}) should be between 20 and 10000."
  end

  @client = Feedlr::Client.new(
    oauth_access_token: @access_token,
    sandbox: @enable_sandbox,
  )
end

#fetchObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/in_feedly.rb', line 67

def fetch
  @subscribe_categories.each do |category_name|
    category_id = "user/#{@profile_id}/category/#{category_name}"
    fetch_time_range = get_fetch_time_range
    loop {
      request_option = { count: @fetch_count, continuation: get_continuation_id, newerThan: fetch_time_range }
      cursor = @client.stream_entries_contents(category_id, request_option)
      cursor.items.each do |item|
        Engine.emit(@tag, Engine.now, item)
      end
      log.debug "Feedly: fetched articles.", articles: cursor.items.size, request_option: request_option
      set_continuation_id(cursor.continuation)
      break if get_continuation_id.nil?
    }
  end
end

#get_continuation_idObject



106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_feedly.rb', line 106

def get_continuation_id
  record = @state_store.get('continuation')
  if subscribe_categories_hash == record[:subscribe_categories_hash]
    return record[:id]
  else
    return nil
  end
end

#get_fetch_time_rangeObject



84
85
86
87
88
89
90
91
92
# File 'lib/fluent/plugin/in_feedly.rb', line 84

def get_fetch_time_range
  if @initial_loop
    @initial_loop = false
    range = @fetch_time_range_on_startup
  else
    range = @fetch_time_range
  end
  return (Time.now.to_i - range ) * 1000
end

#runObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/in_feedly.rb', line 52

def run
  @initial_loop = true
  loop do
    begin
      fetch
    rescue => e
      log.error "Feedly: unexpected error has occoured.", :error => e.message, :error_class => e.class
      log.error_backtrace e.backtrace
      sleep @run_interval
      retry
    end
    sleep @run_interval
  end
end

#set_continuation_id(continuation_id) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/fluent/plugin/in_feedly.rb', line 98

def set_continuation_id(continuation_id)
  @state_store.set("continuation", {
    id: continuation_id,
    subscribe_categories_hash: subscribe_categories_hash
  })
  @state_store.update!
end

#shutdownObject



48
49
50
# File 'lib/fluent/plugin/in_feedly.rb', line 48

def shutdown
  Thread.kill(@thread)
end

#startObject



42
43
44
45
46
# File 'lib/fluent/plugin/in_feedly.rb', line 42

def start
  @profile_id = @client..id
  @state_store = StateStore.new(@state_file)
  @thread = Thread.new(&method(:run))
end

#subscribe_categories_hashObject



94
95
96
# File 'lib/fluent/plugin/in_feedly.rb', line 94

def subscribe_categories_hash
  Digest::SHA512.digest(@subscribe_categories.sort.join(''))
end