Class: Juggler::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/juggler/runner.rb

Overview

Stopping: This is rather complex. The point of the __STOP__ malarkey it to unblock a blocking reserve so that delete and release commands can be actioned on the currently running jobs before shutdown. Also a Juggler.shutdown_grace_timeout period is availble for jobs to complete before the eventmachine is stopped

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(method, concurrency, strategy) ⇒ Runner

Returns a new instance of Runner.



50
51
52
53
54
55
56
57
# File 'lib/juggler/runner.rb', line 50

def initialize(method, concurrency, strategy)
  @strategy = strategy
  @concurrency = concurrency
  @queue = method.to_s
  
  @running = []
  @reserved = false
end

Class Method Details

.start(runner) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/juggler/runner.rb', line 10

def start(runner)
  @runners ||= []
  @runners << runner
  
  @signals_setup ||= begin
    %w{INT TERM}.each do |sig|
      Signal.trap(sig) {
        stop_all_runners_with_grace
      }
    end
    true
  end
end

Instance Method Details

#reserveObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/juggler/runner.rb', line 69

def reserve
  @reserved = true
  
  reserve_call = connection.reserve
  
  reserve_call.callback do |job|
    @reserved = false

    begin
      params = Marshal.load(job.body)
    rescue => e
      handle_exception(e, "#{to_s}: Exception unmarshaling #{@queue} job")
      connection.delete(job)
      next
    end
    
    if params == "__STOP__"
      connection.delete(job)
      next
    end
    
    job_runner = JobRunner.new(job, params, @strategy)
    
    @running << job_runner

    Juggler.logger.debug {
      "#{to_s}: Excecuting #{@running.size} jobs"
    }

    # We may reserve after job is running (after fetching stats)
    job_runner.bind(:running) {
      reserve_if_necessary
    }

    # Also may reserve when a job is done
    job_runner.bind(:done) {
      @running.delete(job_runner)
      reserve_if_necessary
    }

    job_runner.run
  end
  
  reserve_call.errback do |error|
    @reserved = false
    
    if error == :deadline_soon
      # This doesn't necessarily mean that a job has taken too long, it is 
      # quite likely that the blocking reserve is just stopping jobs from 
      # being deleted
      Juggler.logger.debug "#{to_s}: Reserve terminated (deadline_soon)"

      check_all_reserved_jobs.callback {
        reserve_if_necessary
      }
    elsif error == :disconnected
      Juggler.logger.warn "#{to_s}: Reserve terminated (beanstalkd disconnected)"
    else
      Juggler.logger.error "#{to_s}: Unexpected error: #{error}"
      reserve_if_necessary
    end
  end
end

#reserve_if_necessaryObject

We potentially need to issue a new reserve call after a job is reserved (if we’re not at the concurrency limit), and after a job completes (unless we’re already reserving)



62
63
64
65
66
67
# File 'lib/juggler/runner.rb', line 62

def reserve_if_necessary
  if @on && @connection.connected? && !@reserved && @running.size < @concurrency
    Juggler.logger.debug "#{to_s}: Reserving"
    reserve
  end
end

#runObject



133
134
135
136
137
138
# File 'lib/juggler/runner.rb', line 133

def run
  @on = true
  Runner.start(self)
  # Creates beanstalkd connection - reserve happens on connect
  connection
end

#running?Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/juggler/runner.rb', line 149

def running?
  @running.size > 0
end

#stopObject



140
141
142
143
144
145
146
147
# File 'lib/juggler/runner.rb', line 140

def stop
  @on = false

  # See class documentation on stopping
  if @reserved
    Juggler.throw(@queue, "__STOP__")
  end
end

#to_sObject



153
154
155
# File 'lib/juggler/runner.rb', line 153

def to_s
  "Tube #{@queue}"
end