Class: Blat::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/blat/queue.rb

Overview

The Blat::Queue class represents a download queue that handles requests using Curl::Multi. It, and its descendants, accept a large number of Curl::Easy objects and download them in parallel.

In order to know when each request has completed, use Curl::Easy::on_complete. This is made simpler by Queue#add, which will yield to a block on completion of each download.

Direct Known Subclasses

ConsumingQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_connections, pipeline = true) ⇒ Queue

Create a new Blat::Queue with a given number of maximum connections.

The ‘pipeline’ options controls Curl::Multi’s pipelining feature, which tries to use the same http connection for many requests to the same server.



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/blat/queue.rb', line 22

def initialize(max_connections, pipeline = true)
  @multi = Curl::Multi.new

  # Set properties
  @max_connects         = max_connections.to_i
  @pipeline             = (pipeline == true)
  @multi.max_connects   = @max_connects
  @multi.pipeline       = @pipeline

  # Keep track of activity
  @active               = false
  @activity_mx          = Mutex.new
end

Instance Attribute Details

#max_connectionsObject (readonly)

Returns the value of attribute max_connections.



16
17
18
# File 'lib/blat/queue.rb', line 16

def max_connections
  @max_connections
end

#pipelineObject (readonly)

Returns the value of attribute pipeline.



16
17
18
# File 'lib/blat/queue.rb', line 16

def pipeline
  @pipeline
end

Instance Method Details

#active?Boolean

Is this object currently actively downloading data?



115
116
117
# File 'lib/blat/queue.rb', line 115

def active?
  @activity_mx.synchronize { @active }
end

#add(curl_or_link, &block) ⇒ Object

Add a URL or a Curl::Easy object to the queue.

Optionally, provide a callback for calling when requests are complete, e.g.:

q.add('http://google.com') do |c|
  puts "Complete request: #{r}"
end


45
46
47
48
49
50
51
52
53
54
55
# File 'lib/blat/queue.rb', line 45

def add(curl_or_link, &block)
  # Convert to curl if necessary
  curl = curl_or_link.is_a?(Curl::Easy) ? curl_or_link : Curl::Easy.new(curl_or_link)
  curl.on_complete { |c| block.yield(c) } if block_given?

  # Add
  @multi.add(curl)

  # Return
  return curl
end

#cancelObject Also known as: cancel!

Cancel all requests currently queued or downloading



58
59
60
# File 'lib/blat/queue.rb', line 58

def cancel
  @multi.cancel!
end

#idle?Boolean

Is the queue idle?



120
121
122
# File 'lib/blat/queue.rb', line 120

def idle?
  @multi.idle?
end

#perform(&block) ⇒ Object

Run the queue, waiting for requests to finish (blocking).

If a block is given it is executed repeatedly whilst waiting, e.g.

q.perform {
  puts "Active downloads: #{q.request_count}"
}


89
90
91
92
93
94
95
96
97
98
# File 'lib/blat/queue.rb', line 89

def perform(&block)
  raise 'Already actively performing requests' if active?

  @activity_mx.synchronize { @active = true }
  @multi.perform do
    yield if block_given?
  end
ensure
  @activity_mx.synchronize { @active = false }
end

#perform_nonblock(&block) ⇒ Object

Perform downloads in a nonblocking manner

Optionally run the block given, as with regular #perform



103
104
105
106
107
108
109
110
111
112
# File 'lib/blat/queue.rb', line 103

def perform_nonblock(&block)
  raise 'Currently active' if @thread

    me = self
    @thread = Thread.new() do
      me.perform { yield if block_given? }
    end
    @thread.abort_on_exception = true

end

#remove(curl) ⇒ Object

Remove a request from the queue.

This needn’t be called if a request has completed.



77
78
79
# File 'lib/blat/queue.rb', line 77

def remove(curl)
  @multi.remove(curl)
end

#request_countObject

Returns the number of active requests



65
66
67
# File 'lib/blat/queue.rb', line 65

def request_count
  requests.length
end

#requestsObject

Returns a list of active requests (Curl::Easy objects)



70
71
72
# File 'lib/blat/queue.rb', line 70

def requests
  @multi.requests
end