Class: Fluent::DeskcomInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_deskcom.rb

Constant Summary collapse

OUTPUT_FORMAT_TYPE =

unsupported yet: nest flat

%w(simple)
INPUT_API_TYPE =

unsupported yet: brand article reply ~

%w(cases replies)
DEFAULT_PER_PAGE =
50
SORT_DIRECTION_TYPE =
%w(asc desc)

Instance Method Summary collapse

Constructor Details

#initializeDeskcomInput



26
27
28
29
30
31
# File 'lib/fluent/plugin/in_deskcom.rb', line 26

def initialize
  super
  require 'desk'
  require 'yaml'
  require 'pathname'
end

Instance Method Details

#configure(conf) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/in_deskcom.rb', line 34

def configure(conf)
  super
  if !OUTPUT_FORMAT_TYPE.include?(@output_format)
    raise Fluent::ConfigError, "output_format value undefined #{@output_format}"
  end

  if !INPUT_API_TYPE.include?(@input_api)
    raise Fluent::ConfigError, "input_api value undefined #{@input_api}"
  end

  if !SORT_DIRECTION_TYPE.include?(@sort_direction)
    raise Fluent::ConfigError, "sort_direction value undefined #{@sort_direction}"
  end

  if !@consumer_key || !@consumer_secret || !@oauth_token || !@oauth_token_secret
    raise Fluent::ConfigError, "missing values in consumer_key or consumer_secret or oauth_token or oauth_token_secret"
  end

  if !@store_file
    $log.warn("stored_time_file path is missing")
  end

  @tick = @interval * 60

  @per_page = DEFAULT_PER_PAGE

  Desk.configure do |config|
    config.subdomain          = @subdomain
    config.consumer_key       = @consumer_key
    config.consumer_secret    = @consumer_secret
    config.oauth_token        = @oauth_token
    config.oauth_token_secret = @oauth_token_secret
  end
end

#get_content(status) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/fluent/plugin/in_deskcom.rb', line 143

def get_content(status)
  case @output_format
  when 'simple'
    record = Hash.new
    status.each_pair do |k,v|
      if (!@time_column.nil? && k == "#{@time_column}") then
        @time_value = Time.parse(v).to_i rescue nil
      end

      if (k == '_links') then
        next
      end

      if v.kind_of? Hashie::Deash then
        record.store(k, v.to_json)
      else
        record.store(k, v)
      end
    end
  end

  if !@time_value.nil? then
    Engine.emit(@tag, @time_value, record)
  else
    Engine.emit(@tag, @started_time,  record)
  end
rescue => e
  $log.error "deskcom get_content: #{e.message}"
end

#get_streamObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/in_deskcom.rb', line 90

def get_stream
  page = 1
  loop do
    cases = nil
    begin
      cases = Desk.cases(:since_updated_at => @stored_time, :max_updated_at => @started_time,
                         :page => page, :per_page => @per_page,
                         :sort_field => 'updated_at', :sort_direction => @sort_direction)
    rescue Desk::NotFound => e
      $log.info "No more records: #{e.message}"
      break
    end

    if @input_api == 'cases' then
      cases.each do |c|
        get_content(c)
      end
      $log.info "Case total entries: #{cases.total_entries} page: #{page}"

    elsif @input_api == 'replies'
      cases.each do |c|
        Desk.case_replies(c.id).each do |r|
          r[:case_id] = c.id
          get_content(r) if c.count > 0
        end
      end
      $log.info "Case total entries with replies: #{cases.total_entries} page: #{page}"
    end

    page = page + 1
    # if getting above 500 pages limit for a search
    #   (http://dev.desk.com/API/cases/#list), reset the reference point and
    #   the page count to 1
    if page >= 500
      require 'time'
      if @sort_direction == 'asc'
        # if sorting by ascending 'updated_at', reset the lower filter limit
        #   to focus on the upper part of the records that would reside in the
        #   'pages' past 500
        @stored_time = Time.parse(cases.map(&:updated_at).sort.last).to_i
      else
        # if sorting by descending 'updated_at', reset the upper filter limit
        #   to focus on the lower part of the records that would reside in the
        #   'pages' past 500
        @started_time = Time.parse(cases.map(&:updated_at).sort.first).to_i
      end
      page = 1
    end
  end
rescue => e
  $log.error "deskcom run: #{e.message}"
end

#load_store_file(store_file) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/fluent/plugin/in_deskcom.rb', line 173

def load_store_file(store_file)
  f = Pathname.new(store_file)
  stored_time = 0
  f.open('r') do |f|
    stored = YAML.load_file(f)
    stored_time = stored[:time].to_i
  end
  $log.info "deskcom: Load #{store_file}: #{stored_time}"
  return stored_time
rescue => e
  $log.warn "deskcom: Can't load store_file #{e.message}"
  return 0
end

#runObject



80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/in_deskcom.rb', line 80

def run
  while true
    @started_time = Time.now.to_i
    @stored_time = load_store_file(@store_file)
    get_stream
    save_store_file(@store_file, @started_time) unless !@store_file
    sleep @tick
  end
end

#save_store_file(store_file, time) ⇒ Object



187
188
189
190
191
192
193
194
195
196
# File 'lib/fluent/plugin/in_deskcom.rb', line 187

def save_store_file(store_file, time)
  f = Pathname.new(store_file)
  f.open('w') do |f|
    data = {:time => time}
    YAML.dump(data, f)
  end
  $log.info "deskcom: Save started_time: #{time} to #{store_file}"
rescue => e
  $log.warn "deskcom: Can't save store_file #{e.message}"
end

#shutdownObject



74
75
76
77
78
# File 'lib/fluent/plugin/in_deskcom.rb', line 74

def shutdown
  super
  @thread.terminate
  @thread.join
end

#startObject



69
70
71
72
# File 'lib/fluent/plugin/in_deskcom.rb', line 69

def start
  super
  @thread = Thread.new(&method(:run))
end