Class: Curl::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/curb_threadpool.rb

Defined Under Namespace

Classes: Request

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = 4) ⇒ ThreadPool

Returns a new instance of ThreadPool.



21
22
23
24
# File 'lib/curb_threadpool.rb', line 21

def initialize(size=4)
  @size = size
  reset()
end

Instance Attribute Details

#reqsObject (readonly)

Returns the value of attribute reqs.



19
20
21
# File 'lib/curb_threadpool.rb', line 19

def reqs
  @reqs
end

#resultsObject (readonly)

Returns the value of attribute results.



19
20
21
# File 'lib/curb_threadpool.rb', line 19

def results
  @results
end

Instance Method Details

#add(req) ⇒ Object Also known as: <<

Shorthand for adding requests



27
28
29
# File 'lib/curb_threadpool.rb', line 27

def add(req)
  @reqs << req
end

#closeObject

Close all active Curl connections



39
40
41
42
43
# File 'lib/curb_threadpool.rb', line 39

def close
  if @clients then
    @clients.each { |c| c.reset(); c.close() }
  end
end

#get(urls) ⇒ Array

Send multiple get requests

Parameters:

  • urls (Array<String>)

    list of URLs

Returns:

  • (Array)

    array of response bodies



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/curb_threadpool.rb', line 60

def get(urls)
  if urls.nil? or urls.empty? then
    return []
  end

  urls = [urls] if not urls.kind_of? Array
  urls.each_with_index do |url, i|
    @reqs << Request.new(i, url.to_s)
  end

  return collate_results(perform())
end

#joinObject

Wait for all threads to complete



33
34
35
36
# File 'lib/curb_threadpool.rb', line 33

def join
  @threads.each { |t| t.join }
  @threads.clear
end

#perform(async = false, &block) {|Request, String| ... } ⇒ Hash<Key, String>

Execute requests. By default, will block until complete and return results.

Parameters:

  • async (Boolean) (defaults to: false)

    If true, will not wait for requests to finish. (Default=false)

  • block (Block)

    If passed, responses will be passed into the callback instead of being returned directly

Yields:

  • (Request, String)

    Passes to the block the request and the response body

Returns:

  • (Hash<Key, String>)

    Hash of responses, if no block given. Returns true otherwise



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/curb_threadpool.rb', line 107

def perform(async=false, &block)

  @results = {}

  @clients.each do |client|
    @threads << Thread.new do

      loop do
        break if @reqs.empty?
        req = @reqs.shift
        client.url = req.uri

        args = ["http_#{req.method}"]
        if [:put, :post].include? req.method
          # add body to args for these methods
          if req.body then
            if req.body.kind_of? Array then
              args += req.body
            else
              args << req.body
            end
          else
            args << ""
          end
        end

        client.send(*args)
        if block then
          yield(req, client.body_str)
        else
          @results[req.key] = client.body_str
        end
      end

    end
  end

  if async then
    # don't wait for threads to join, just return
    return true
  end

  join()
  return true if block

  return @results
end

#post(reqs) ⇒ Object

Send multiple post requests

Parameters:

  • reqs (Array)


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/curb_threadpool.rb', line 76

def post(reqs)
  if reqs.nil? or reqs.empty? then
    return []
  end

  if not reqs.first.kind_of? Array then
    reqs = [ reqs ]
  end

  reqs.each_with_index do |r, i|
    if r.kind_of? Request then
      @reqs << r
    elsif r.kind_of? Array then
      @reqs << Request.new(i, r.shift, :post, r.shift)
    end
  end

  return collate_results(perform())
end

#resetObject

Reset the ThreadPool



46
47
48
49
50
51
52
53
# File 'lib/curb_threadpool.rb', line 46

def reset
  close()
  @reqs = []
  @results = {}
  @clients = []
  @threads = []
  @size.times{ @clients << Curl::Easy.new }
end