Class: Beanstalk::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/beanstalk-client/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(addr, default_tube = nil) ⇒ Connection

Returns a new instance of Connection.



29
30
31
32
33
34
35
36
37
# File 'lib/beanstalk-client/connection.rb', line 29

def initialize(addr, default_tube=nil)
  @waiting = false
  @addr = addr
  connect
  @last_used = 'default'
  @watch_list = [@last_used]
  self.use(default_tube) if default_tube
  self.watch(default_tube) if default_tube
end

Instance Attribute Details

#addrObject (readonly)

Returns the value of attribute addr.



27
28
29
# File 'lib/beanstalk-client/connection.rb', line 27

def addr
  @addr
end

Instance Method Details

#bury(id, pri) ⇒ Object



115
116
117
118
# File 'lib/beanstalk-client/connection.rb', line 115

def bury(id, pri)
  interact("bury #{id} #{pri}\r\n", %w(BURIED))
  :ok
end

#closeObject



47
48
49
50
# File 'lib/beanstalk-client/connection.rb', line 47

def close
  @socket.close
  @socket = nil
end

#connectObject



39
40
41
42
43
44
45
# File 'lib/beanstalk-client/connection.rb', line 39

def connect
  host, port = addr.split(':')
  @socket = TCPSocket.new(host, port.to_i)

  # Don't leak fds when we exec.
  @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
end

#delete(id) ⇒ Object



102
103
104
105
# File 'lib/beanstalk-client/connection.rb', line 102

def delete(id)
  interact("delete #{id}\r\n", %w(DELETED))
  :ok
end

#ignore(tube) ⇒ Object



145
146
147
148
149
150
# File 'lib/beanstalk-client/connection.rb', line 145

def ignore(tube)
  return @watch_list.size if !@watch_list.include?(tube)
  r = interact("ignore #{tube}\r\n", %w(WATCHING))[0].to_i
  @watch_list -= [tube]
  return r
end

#job_stats(id) ⇒ Object



156
157
158
# File 'lib/beanstalk-client/connection.rb', line 156

def job_stats(id)
  interact("stats-job #{id}\r\n", :yaml)
end

#kick(n) ⇒ Object



125
126
127
# File 'lib/beanstalk-client/connection.rb', line 125

def kick(n)
  interact("kick #{n}\r\n", %w(KICKED))[0].to_i
end

#list_tube_usedObject



168
169
170
# File 'lib/beanstalk-client/connection.rb', line 168

def list_tube_used()
  interact("list-tube-used\r\n", %w(USING))[0]
end

#list_tubesObject



164
165
166
# File 'lib/beanstalk-client/connection.rb', line 164

def list_tubes()
  interact("list-tubes\r\n", :yaml)
end

#list_tubes_watched(cached = false) ⇒ Object



172
173
174
175
# File 'lib/beanstalk-client/connection.rb', line 172

def list_tubes_watched(cached=false)
  return @watch_list if cached
  @watch_list = interact("list-tubes-watched\r\n", :yaml)
end

#peek_buriedObject



77
78
79
# File 'lib/beanstalk-client/connection.rb', line 77

def peek_buried()
  interact("peek-buried\r\n", :job)
end

#peek_delayedObject



73
74
75
# File 'lib/beanstalk-client/connection.rb', line 73

def peek_delayed()
  interact("peek-delayed\r\n", :job)
end

#peek_job(id) ⇒ Object



65
66
67
# File 'lib/beanstalk-client/connection.rb', line 65

def peek_job(id)
  interact("peek #{id}\r\n", :job)
end

#peek_readyObject



69
70
71
# File 'lib/beanstalk-client/connection.rb', line 69

def peek_ready()
  interact("peek-ready\r\n", :job)
end

#put(body, pri = 65536, delay = 0, ttr = 120) ⇒ Object



52
53
54
55
56
57
58
59
# File 'lib/beanstalk-client/connection.rb', line 52

def put(body, pri=65536, delay=0, ttr=120)
  pri = pri.to_i
  delay = delay.to_i
  ttr = ttr.to_i
  body = body.to_s # Make sure that body.size gives a useful number
  interact("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n",
           %w(INSERTED BURIED))[0].to_i
end

#release(id, pri, delay) ⇒ Object



107
108
109
110
111
112
113
# File 'lib/beanstalk-client/connection.rb', line 107

def release(id, pri, delay)
  id = id.to_i
  pri = pri.to_i
  delay = delay.to_i
  interact("release #{id} #{pri} #{delay}\r\n", %w(RELEASED))
  :ok
end

#reserve(timeout = nil) ⇒ Object

Raises:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/beanstalk-client/connection.rb', line 81

def reserve(timeout=nil)
  raise WaitingForJobError if @waiting
  if timeout.nil?
    @socket.write("reserve\r\n")
  else
    @socket.write("reserve-with-timeout #{timeout}\r\n")
  end

  begin
    @waiting = true
    # Give the user a chance to select on multiple fds.
    Beanstalk.select.call([@socket]) if Beanstalk.select
  rescue WaitingForJobError
    # just continue
  ensure
    @waiting = false
  end

  Job.new(self, *read_job('RESERVED'))
end

#statsObject



152
153
154
# File 'lib/beanstalk-client/connection.rb', line 152

def stats()
  interact("stats\r\n", :yaml)
end

#stats_tube(tube) ⇒ Object



160
161
162
# File 'lib/beanstalk-client/connection.rb', line 160

def stats_tube(tube)
  interact("stats-tube #{tube}\r\n", :yaml)
end

#touch(id) ⇒ Object



120
121
122
123
# File 'lib/beanstalk-client/connection.rb', line 120

def touch(id)
  interact("touch #{id}\r\n", %w(TOUCHED))
  :ok
end

#use(tube) ⇒ Object



129
130
131
132
133
134
# File 'lib/beanstalk-client/connection.rb', line 129

def use(tube)
  return tube if tube == @last_used
  @last_used = interact("use #{tube}\r\n", %w(USING))[0]
rescue BadFormatError
  raise InvalidTubeName.new(tube)
end

#watch(tube) ⇒ Object



136
137
138
139
140
141
142
143
# File 'lib/beanstalk-client/connection.rb', line 136

def watch(tube)
  return @watch_list.size if @watch_list.include?(tube)
  r = interact("watch #{tube}\r\n", %w(WATCHING))[0].to_i
  @watch_list += [tube]
  return r
rescue BadFormatError
  raise InvalidTubeName.new(tube)
end

#yput(obj, pri = 65536, delay = 0, ttr = 120) ⇒ Object



61
62
63
# File 'lib/beanstalk-client/connection.rb', line 61

def yput(obj, pri=65536, delay=0, ttr=120)
  put(YAML.dump(obj), pri, delay, ttr)
end