Class: GHTDataRetrieval

Inherits:
GHTorrent::Command show all
Includes:
GHTorrent::EventProcessing, GHTorrent::Logging, GHTorrent::Persister, GHTorrent::Settings
Defined in:
lib/ghtorrent/commands/ght_data_retrieval.rb

Constant Summary

Constants included from GHTorrent::Persister

GHTorrent::Persister::ADAPTERS

Constants included from GHTorrent::Settings

GHTorrent::Settings::CONFIGKEYS, GHTorrent::Settings::DEFAULTS

Instance Method Summary collapse

Methods included from GHTorrent::EventProcessing

#CommitCommentEvent, #CreateEvent, #FollowEvent, #ForkEvent, #IssueCommentEvent, #IssuesEvent, #MemberEvent, #PullRequestEvent, #PullRequestReviewCommentEvent, #PushEvent, #WatchEvent

Methods included from GHTorrent::Persister

#connect, #disconnect

Methods included from GHTorrent::Logging

#debug, #error, #info, #loggerr, #warn

Methods included from GHTorrent::Settings

#config, #merge, #merge_config_values, #override_config, #settings

Methods included from GHTorrent::Utils

included, #read_value, #user_type, #write_value

Methods inherited from GHTorrent::Command

#command_name, #override_config, #process_options, #queue_client, run, #version

Instance Method Details

#ghtorrentObject



50
51
52
53
54
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 50

def ghtorrent
  #@gh ||= GHTorrent::Mirror.new(@settings)
  @gh ||= TransactedGHTorrent.new(settings)
  @gh
end

#goObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 64

def go

  unless ARGV[0].nil?
    event = retrieve_event(ARGV[0])

    if event.nil?
      warn "No event with id: #{ARGV[0]}"
    else
      send(event['type'], event)
    end
    return
  end

  conn = Bunny.new(:host => config(:amqp_host),
                   :port => config(:amqp_port),
                   :username => config(:amqp_username),
                   :password => config(:amqp_password))
  conn.start

  channel = conn.create_channel
  debug "Setting prefetch to #{config(:amqp_prefetch)}"
  channel.prefetch(config(:amqp_prefetch))
  debug "Connection to #{config(:amqp_host)} succeded"

  exchange = channel.topic(config(:amqp_exchange), :durable => true,
                           :auto_delete => false)

  handlers.each do |h|
    queue = channel.queue("#{h}s", {:durable => true})\
                       .bind(exchange, :routing_key => "evt.#{h}")

    info "Binding handler #{h} to routing key evt.#{h}"

    queue.subscribe(:ack => true) do |headers, properties, msg|
      start = Time.now
      begin
        data = retrieve_event(msg)
        send(h, data)

        channel.acknowledge(headers.delivery_tag, false)
        info "Success processing event. Type: #{data['type']}, ID: #{data['id']}, Time: #{Time.now.to_ms - start.to_ms} ms"
      rescue StandardError => e
        # Give a message a chance to be reprocessed
        if headers.redelivered?
          warn "Error processing event. Type: #{data['type']}, ID: #{data['id']}, Time: #{Time.now.to_ms - start.to_ms} ms"
          channel.reject(headers.delivery_tag, false)
        else
          channel.reject(headers.delivery_tag, true)
        end

        STDERR.puts e
        STDERR.puts e.backtrace.join("\n")
      end
    end
  end

  stopped = false
  while not stopped
    begin
      sleep(1)
    rescue Interrupt => _
      debug 'Exit requested'
      stopped = true
    end
  end

  debug 'Closing AMQP connection'
  channel.close unless channel.nil?
  conn.close unless conn.nil?

end

#handlersObject



26
27
28
29
30
31
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 26

def handlers
  %w(PushEvent WatchEvent FollowEvent MemberEvent CreateEvent
      CommitCommentEvent PullRequestEvent ForkEvent
      PullRequestReviewCommentEvent IssuesEvent IssueCommentEvent)
  #%w(ForkEvent)
end

#loggerObject



46
47
48
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 46

def logger
  ghtorrent.logger
end

#parse(msg) ⇒ Object



22
23
24
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 22

def parse(msg)
  JSON.parse(msg)
end

#persisterObject



17
18
19
20
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 17

def persister
  @persister ||= connect(:mongo, settings)
  @persister
end

#prepare_options(options) ⇒ Object



33
34
35
36
37
38
39
40
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 33

def prepare_options(options)
  options.banner <<-BANNER
Retrieves events from queues and processes them through GHTorrent.
If event_id is provided, only this event is processed.
#{command_name} [event_id]
  BANNER

end

#retrieve_event(evt_id) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 56

def retrieve_event(evt_id)
  event = persister.get_underlying_connection[:events].find_one('id' => evt_id)
  event.delete '_id'
  data = parse(event.to_json)
  debug "Processing event: #{data['type']}-#{data['id']}"
  data
end

#validateObject



42
43
44
# File 'lib/ghtorrent/commands/ght_data_retrieval.rb', line 42

def validate
  super
end