Class: Beanstalk::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/beanstalk-client/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(addr, default_tube = nil) ⇒ Connection

Returns a new instance of Connection.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/beanstalk-client/connection.rb', line 28

def initialize(addr, default_tube=nil)
  @mutex = Mutex.new
  @tube_mutex = Mutex.new
  @waiting = false
  @addr = addr
  connect
  @last_used = 'default'
  @watch_list = [@last_used]
  self.use(default_tube) if default_tube
  self.watch(default_tube) if default_tube
end

Instance Attribute Details

#addrObject (readonly)

Returns the value of attribute addr.



26
27
28
# File 'lib/beanstalk-client/connection.rb', line 26

def addr
  @addr
end

Instance Method Details

#bury(id, pri) ⇒ Object



127
128
129
130
# File 'lib/beanstalk-client/connection.rb', line 127

def bury(id, pri)
  interact("bury #{id} #{pri}\r\n", %w(BURIED))
  :ok
end

#closeObject



48
49
50
51
# File 'lib/beanstalk-client/connection.rb', line 48

def close
  @socket.close
  @socket = nil
end

#connectObject



40
41
42
43
44
45
46
# File 'lib/beanstalk-client/connection.rb', line 40

def connect
  host, port = addr.split(':')
  @socket = TCPSocket.new(host, port.to_i)

  # Don't leak fds when we exec.
  @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
end

#delete(id) ⇒ Object



114
115
116
117
# File 'lib/beanstalk-client/connection.rb', line 114

def delete(id)
  interact("delete #{id}\r\n", %w(DELETED))
  :ok
end

#ignore(tube) ⇒ Object



157
158
159
160
161
162
# File 'lib/beanstalk-client/connection.rb', line 157

def ignore(tube)
  return @watch_list.size if !@watch_list.include?(tube)
  r = interact("ignore #{tube}\r\n", %w(WATCHING))[0].to_i
  @watch_list -= [tube]
  return r
end

#job_stats(id) ⇒ Object



168
169
170
# File 'lib/beanstalk-client/connection.rb', line 168

def job_stats(id)
  interact("stats-job #{id}\r\n", :yaml)
end

#kick(n) ⇒ Object



137
138
139
# File 'lib/beanstalk-client/connection.rb', line 137

def kick(n)
  interact("kick #{n}\r\n", %w(KICKED))[0].to_i
end

#list_tube_usedObject



180
181
182
# File 'lib/beanstalk-client/connection.rb', line 180

def list_tube_used()
  interact("list-tube-used\r\n", %w(USING))[0]
end

#list_tubesObject



176
177
178
# File 'lib/beanstalk-client/connection.rb', line 176

def list_tubes()
  interact("list-tubes\r\n", :yaml)
end

#list_tubes_watched(cached = false) ⇒ Object



184
185
186
187
# File 'lib/beanstalk-client/connection.rb', line 184

def list_tubes_watched(cached=false)
  return @watch_list if cached
  @watch_list = interact("list-tubes-watched\r\n", :yaml)
end

#on_tube(tube, &block) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/beanstalk-client/connection.rb', line 82

def on_tube(tube, &block)
  @tube_mutex.lock
  use tube
  yield self
ensure
  @tube_mutex.unlock
end

#peek_buriedObject



78
79
80
# File 'lib/beanstalk-client/connection.rb', line 78

def peek_buried()
  interact("peek-buried\r\n", :job)
end

#peek_delayedObject



74
75
76
# File 'lib/beanstalk-client/connection.rb', line 74

def peek_delayed()
  interact("peek-delayed\r\n", :job)
end

#peek_job(id) ⇒ Object



66
67
68
# File 'lib/beanstalk-client/connection.rb', line 66

def peek_job(id)
  interact("peek #{id}\r\n", :job)
end

#peek_readyObject



70
71
72
# File 'lib/beanstalk-client/connection.rb', line 70

def peek_ready()
  interact("peek-ready\r\n", :job)
end

#put(body, pri = 65536, delay = 0, ttr = 120) ⇒ Object



53
54
55
56
57
58
59
60
# File 'lib/beanstalk-client/connection.rb', line 53

def put(body, pri=65536, delay=0, ttr=120)
  pri = pri.to_i
  delay = delay.to_i
  ttr = ttr.to_i
  body = "#{body}" # Make sure that body.bytesize gives a useful number
  interact("put #{pri} #{delay} #{ttr} #{body.bytesize}\r\n#{body}\r\n",
           %w(INSERTED BURIED))[0].to_i
end

#release(id, pri, delay) ⇒ Object



119
120
121
122
123
124
125
# File 'lib/beanstalk-client/connection.rb', line 119

def release(id, pri, delay)
  id = id.to_i
  pri = pri.to_i
  delay = delay.to_i
  interact("release #{id} #{pri} #{delay}\r\n", %w(RELEASED))
  :ok
end

#reserve(timeout = nil) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/beanstalk-client/connection.rb', line 90

def reserve(timeout=nil)
  raise WaitingForJobError if @waiting
  @mutex.lock
  if timeout.nil?
    @socket.write("reserve\r\n")
  else
    @socket.write("reserve-with-timeout #{timeout}\r\n")
  end

  begin
    @waiting = true
    # Give the user a chance to select on multiple fds.
    Beanstalk.select.call([@socket]) if Beanstalk.select
  rescue WaitingForJobError
    # just continue
  ensure
    @waiting = false
  end

  Job.new(self, *read_job('RESERVED'))
ensure
  @mutex.unlock
end

#statsObject



164
165
166
# File 'lib/beanstalk-client/connection.rb', line 164

def stats()
  interact("stats\r\n", :yaml)
end

#stats_tube(tube) ⇒ Object



172
173
174
# File 'lib/beanstalk-client/connection.rb', line 172

def stats_tube(tube)
  interact("stats-tube #{tube}\r\n", :yaml)
end

#touch(id) ⇒ Object



132
133
134
135
# File 'lib/beanstalk-client/connection.rb', line 132

def touch(id)
  interact("touch #{id}\r\n", %w(TOUCHED))
  :ok
end

#use(tube) ⇒ Object



141
142
143
144
145
146
# File 'lib/beanstalk-client/connection.rb', line 141

def use(tube)
  return tube if tube == @last_used
  @last_used = interact("use #{tube}\r\n", %w(USING))[0]
rescue BadFormatError
  raise InvalidTubeName.new(tube)
end

#watch(tube) ⇒ Object



148
149
150
151
152
153
154
155
# File 'lib/beanstalk-client/connection.rb', line 148

def watch(tube)
  return @watch_list.size if @watch_list.include?(tube)
  r = interact("watch #{tube}\r\n", %w(WATCHING))[0].to_i
  @watch_list += [tube]
  return r
rescue BadFormatError
  raise InvalidTubeName.new(tube)
end

#yput(obj, pri = 65536, delay = 0, ttr = 120) ⇒ Object



62
63
64
# File 'lib/beanstalk-client/connection.rb', line 62

def yput(obj, pri=65536, delay=0, ttr=120)
  put(YAML.dump(obj), pri, delay, ttr)
end