Class: Fluent::GithubActivities::Crawler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/github-activities/crawler.rb

Defined Under Namespace

Classes: EmptyRequestQueue

Constant Summary collapse

NO_INTERVAL =
0
DEFAULT_INTERVAL =
1
DEFAULT_LAST_EVENT_TIMESTAMP =
-1
"$github-activities-related-avatar"
MERGE_COMMIT_MESSAGE_PATTERN =
/\AMerge pull request #\d+ from [^\/]+\/[^\/]+\n\n/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Crawler

Returns a new instance of Crawler.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 43

def initialize(options={})
  @username = options[:username]
  @password = options[:password]

  @watching_users = options[:watching_users] || []

  @include_commits_from_pull_request = options[:include_commits_from_pull_request]
  @include_foreign_commits = options[:include_foreign_commits]

  @positions = {}
  @pos_file = options[:pos_file]
  @pos_file = Pathname(@pos_file) if @pos_file
  load_positions

  @avatars = {}

  @request_queue = options[:request_queue] || []

  @default_interval = options[:default_interval] || DEFAULT_INTERVAL

  @watching_users.each do |user|
    fetch_avatar(user)
    reserve_user_events(user)
  end
end

Instance Attribute Details

#interval_for_next_requestObject (readonly)

Returns the value of attribute interval_for_next_request.



41
42
43
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 41

def interval_for_next_request
  @interval_for_next_request
end

#on_emit=(value) ⇒ Object (writeonly)

Sets the attribute on_emit

Parameters:

  • value

    the value to set the attribute on_emit to.



40
41
42
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 40

def on_emit=(value)
  @on_emit = value
end

#request_queueObject (readonly)

Returns the value of attribute request_queue.



41
42
43
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 41

def request_queue
  @request_queue
end

Instance Method Details

#extra_request_headers(request) ⇒ Object



126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 126

def extra_request_headers(request)
  headers = {}
  if request[:previous_entity_tag]
    headers["If-None-Match"] = request[:previous_entity_tag]
  elsif request[:type] == TYPE_EVENTS and @positions[request[:user]]
    entity_tag = @positions[request[:user]]["entity_tag"]
    headers["If-None-Match"] = entity_tag if entity_tag
  end
  headers
end

#fetch_avatar(user) ⇒ Object



302
303
304
305
306
307
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 302

def fetch_avatar(user)
  return if @avatars.key?(user)
  response = http_get((user))
   = JSON.parse(response.body)
  @avatars[user] = ["avatar_url"]
end

#process_commit(commit, push_event) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 208

def process_commit(commit, push_event)
  user = commit["author"]["login"]
  fetch_avatar(user)

  if @include_foreign_commits or watching_user?(user)
    commit[RELATED_USER_IMAGE_KEY] = @avatars[user]
    emit("commit", commit)
  end

  commit_refs = push_event["payload"]["commits"]
  target_commit_ref = commit_refs.find do |commit_ref|
    commit_ref["url"] == commit["url"]
  end
  target_commit_ref["commit"] = commit if target_commit_ref

  completely_fetched = commit_refs.all? do |commit_ref|
    commit_ref["commit"]
  end
  emit("push", push_event) if completely_fetched
end

#process_create_event(event) ⇒ Object



292
293
294
295
296
297
298
299
300
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 292

def process_create_event(event)
  payload = event["payload"]
  case payload["ref_type"]
  when "branch"
    emit("branch", event)
  when "tag"
    emit("tag", event)
  end
end

#process_issue_event(event) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 233

def process_issue_event(event)
  payload = event["payload"]
  case payload["action"]
  when "opened"
    emit("issue-open", event)
  when "closed"
    emit("issue-close", event)
  when "reopened"
    emit("issue-reopen", event)
  when "assigned"
    emit("issue-assign", event)
  when "unassigned"
    emit("issue-unassign", event)
  when "labeled"
    emit("issue-label", event)
  when "unlabeled"
    emit("issue-unlabel", event)
  end
end

#process_issue_or_pull_request_comment_event(event) ⇒ Object



282
283
284
285
286
287
288
289
290
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 282

def process_issue_or_pull_request_comment_event(event)
  payload = event["payload"]
  if payload["issue"]["pull_request"]
    emit("pull-request-comment", event)
    # emit("pull-request.cancel", event)
  else
    emit("issue-comment", event)
  end
end

#process_pull_request_event(event) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 253

def process_pull_request_event(event)
  payload = event["payload"]
  case payload["action"]
  when "opened"
    emit("pull-request", event)
  when "closed"
    if payload["pull_request"]["merged"]
      emit("pull-request-merged", event)
    else
      emit("pull-request-cancelled", event)
    end
  when "reopened"
    emit("pull-request-reopen", event)
  end
end

#process_push_event(event) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 193

def process_push_event(event)
  payload = event["payload"]
  commit_refs = payload["commits"]
  if !@include_commits_from_pull_request and
       push_event_from_merged_pull_request?(event)
    return
  end
  commit_refs.reverse.each do |commit_ref|
    @request_queue.unshift(:type => TYPE_COMMIT,
                           :uri  => commit_ref["url"],
                           :push => event)
  end
  # emit("push", event)
end

#process_requestObject

Raises:



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 69

def process_request
  raise EmptyRequestQueue.new if @request_queue.empty?

  request = @request_queue.shift
  $log.info("GithubActivities::Crawler: processing request: #{request.inspect}")
  if request[:process_after] and
       Time.now.to_i < request[:process_after]
    @request_queue.push(request)
    @interval_for_next_request = NO_INTERVAL
    return false
  end

  uri = request_uri(request)
  extra_headers = extra_request_headers(request)

  $log.info("GithubActivities::Crawler: requesting to #{uri.inspect}")
  response = http_get(uri, extra_headers)
  $log.info("GithubActivities::Crawler: response: #{response.inspect}")

  case response
  when Net::HTTPSuccess
    body = JSON.parse(response.body)
    $log.info("GithubActivities::Crawler: request type: #{request[:type]}")
    case request[:type]
    when TYPE_EVENTS
      events = body
      $log.info("GithubActivities::Crawler: events size: #{events.size}")
      process_user_events(request[:user], events)
      reserve_user_events(request[:user], :previous_response => response)
      save_user_position(request[:user], :entity_tag => response["ETag"])
    when TYPE_COMMIT
      process_commit(body, request[:push])
    end
  when Net::HTTPNotModified
    case request[:type]
    when TYPE_EVENTS
      reserve_user_events(request[:user],
                          :previous_response => response,
                          :previous_entity_tag => extra_headers["If-None-Match"])
    end
    @interval_for_next_request = NO_INTERVAL
    return true
  end
  @interval_for_next_request = @default_interval
  return true
end

#process_user_event(user, event) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 170

def process_user_event(user, event)
  # see also: https://developer.github.com/v3/activity/events/types/
  event[RELATED_USER_IMAGE_KEY] = @avatars[user]
  case event["type"]
  when "PushEvent"
    process_push_event(event)
  when "CommitCommentEvent"
    emit("commit-comment", event)
  when "IssuesEvent"
    process_issue_event(event)
  when "IssueCommentEvent"
    process_issue_or_pull_request_comment_event(event)
  when "ForkEvent"
    emit("fork", event)
  when "PullRequestEvent"
    process_pull_request_event(event)
  when "CreateEvent"
    process_create_event(event)
  else
    emit(event["type"], event)
  end
end

#process_user_events(user, events) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 154

def process_user_events(user, events)
  last_event_timestamp = DEFAULT_LAST_EVENT_TIMESTAMP
  if @positions[user] and @positions[user]["last_event_timestamp"]
    last_event_timestamp = @positions[user]["last_event_timestamp"]
  end
  events = events.sort do |a, b|
    b["created_at"] <=> a["created_at"]
  end
  events.each do |event|
    timestamp = Time.parse(event["created_at"]).to_i
    next if timestamp <= last_event_timestamp
    process_user_event(user, event)
    save_user_position(user, :last_event_timestamp => timestamp)
  end
end

#push_event_from_merged_pull_request?(event) ⇒ Boolean

Returns:

  • (Boolean)


271
272
273
274
275
276
277
278
279
280
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 271

def push_event_from_merged_pull_request?(event)
  payload = event["payload"]
  inserted_requests = []
  commit_refs = payload["commits"]
  if MERGE_COMMIT_MESSAGE_PATTERN =~ commit_refs.last["message"]
    true
  else
    false
  end
end

#request_uri(request) ⇒ Object



116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 116

def request_uri(request)
  uri = nil
  case request[:type]
  when TYPE_EVENTS
    uri = user_activities(request[:user])
  else
    uri = request[:uri]
  end
end

#reserve_user_events(user, options = {}) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 137

def reserve_user_events(user, options={})
  request = {
    :type => TYPE_EVENTS,
    :user => user,
  }
  response = options[:previous_response]
  if response
    now = options[:now] || Time.now
    interval = response["X-Poll-Interval"].to_i
    time_to_process = now.to_i + interval
    request[:previous_entity_tag] = response["ETag"] ||
                                      options[:previous_entity_tag]
    request[:process_after] = time_to_process
  end
  @request_queue.push(request)
end

#watching_user?(user) ⇒ Boolean

Returns:

  • (Boolean)


229
230
231
# File 'lib/fluent/plugin/github-activities/crawler.rb', line 229

def watching_user?(user)
  @watching_users.include(user)
end