Class: GHTMirrorEvents

Inherits:
GHTorrent::Command show all
Includes:
GHTorrent::APIClient, GHTorrent::Logging, GHTorrent::Persister, GHTorrent::Settings
Defined in:
lib/ghtorrent/commands/ght_mirror_events.rb

Constant Summary

Constants included from GHTorrent::Settings

GHTorrent::Settings::CONFIGKEYS, GHTorrent::Settings::DEFAULTS

Constants included from GHTorrent::Persister

GHTorrent::Persister::ADAPTERS

Instance Method Summary collapse

Methods included from GHTorrent::APIClient

#api_request, #num_pages, #paged_api_request

Methods included from GHTorrent::Settings

#config, #merge, #merge_config_values, #override_config, #settings

Methods included from GHTorrent::Utils

included, #read_value, #user_type, #write_value

Methods included from GHTorrent::Logging

#debug, #error, #info, #loggerr, #warn

Methods included from GHTorrent::Persister

#connect, #disconnect

Methods inherited from GHTorrent::Command

#command_name, #override_config, #prepare_options, #process_options, #queue_client, run, #validate, #version

Instance Method Details

#goObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/ghtorrent/commands/ght_mirror_events.rb', line 64

def go
  @persister = connect(:mongo, @settings)

  conn = Bunny.new(:host => config(:amqp_host),
                   :port => config(:amqp_port),
                   :username => config(:amqp_username),
                   :password => config(:amqp_password))
  conn.start

  ch  = conn.create_channel
  debug "Connection to #{config(:amqp_host)} succeded"

  exchange = ch.topic(config(:amqp_exchange), :durable => true,
               :auto_delete => false)

  dupl_msgs = new_msgs = loops = 0
  stopped = false
  while not stopped
    begin
      (new, dupl) = retrieve exchange
      dupl_msgs += dupl
      new_msgs += new
      loops += 1
      sleep(5)

      if loops >= 12 # One minute
        ratio = (dupl_msgs.to_f / (dupl_msgs + new_msgs).to_f)
        info("Stats: #{new_msgs} new, #{dupl_msgs} duplicate, ratio: #{ratio}")
        dupl_msgs = new_msgs = loops = 0
      end
    rescue Interrupt
      stopped = true
    rescue StandardError => e
      @logger.error e
    end
  end
end

#retrieve(exchange) ⇒ Object

Retrieve events from Github, store them in the DB



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/ghtorrent/commands/ght_mirror_events.rb', line 38

def retrieve(exchange)
  begin
    new = dupl = 0
    events = api_request "https://api.github.com/events?per_page=100"
    (new, dupl, stored) = store_count events

    # This means that the first page does not contain all new events. Do
    # a paged request and get everything on the queue
    if dupl == 0
      events = paged_api_request "https://api.github.com/events?per_page=100"
      (new1, dupl1, stored1) = store_count events
      stored = stored | stored1
      new = new + new1
    end

    stored.each do |e|
      key = "evt.#{e['type']}"
      exchange.publish e['id'], :persistent => true, :routing_key => key
    end
    return new, dupl
  rescue StandardError => e
    STDERR.puts e.message
    STDERR.puts e.backtrace
  end
end

#store_count(events) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/ghtorrent/commands/ght_mirror_events.rb', line 20

def store_count(events)
  stored = Array.new
  new = dupl = 0
  events.each do |e|
    if @persister.find(:events, {'id' => e['id']}).empty?
      stored << e
      new += 1
      @persister.store(:events, e)
      info "Added #{e['id']}"
    else
      info "Already got #{e['id']}"
      dupl += 1
    end
  end
  return new, dupl, stored
end