Class: ActiveMessaging::Adapters::Beanstalk::Connection

Inherits:
ActiveMessaging::Adapters::BaseConnection show all
Defined in:
lib/activemessaging/adapters/beanstalk.rb

Instance Attribute Summary collapse

Attributes inherited from ActiveMessaging::Adapters::BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

Returns a new instance of Connection.



21
22
23
24
25
26
# File 'lib/activemessaging/adapters/beanstalk.rb', line 21

def initialize cfg
  @host = cfg[:host] || 'localhost'
  @port = cfg[:port] || 11300
  
  @connection = ::Beanstalk::Pool.new(["#{@host}:#{@port}"])
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



19
20
21
# File 'lib/activemessaging/adapters/beanstalk.rb', line 19

def connection
  @connection
end

#hostObject

Returns the value of attribute host.



19
20
21
# File 'lib/activemessaging/adapters/beanstalk.rb', line 19

def host
  @host
end

#portObject

Returns the value of attribute port.



19
20
21
# File 'lib/activemessaging/adapters/beanstalk.rb', line 19

def port
  @port
end

Instance Method Details

#disconnectObject



28
29
30
# File 'lib/activemessaging/adapters/beanstalk.rb', line 28

def disconnect
  @connection.close
end

#receive(options = {}) ⇒ Object



49
50
51
52
# File 'lib/activemessaging/adapters/beanstalk.rb', line 49

def receive(options={})
  message = @connection.reserve
  Beanstalk::Message.new message
end

#received(message, message_headers = {}) ⇒ Object



54
55
56
# File 'lib/activemessaging/adapters/beanstalk.rb', line 54

def received message, message_headers={}
  message.delete
end

#send(tube, message, message_headers = {}) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/activemessaging/adapters/beanstalk.rb', line 40

def send tube, message, message_headers={}
  priority = message_headers[:priority] || 65536
  delay    = message_headers[:delay] || 0
  ttr      = message_headers[:ttr] || 120
  
  @connection.use(tube)
  @connection.put(message, priority, delay, ttr)
end

#subscribe(tube, message_headers = {}) ⇒ Object



32
33
34
# File 'lib/activemessaging/adapters/beanstalk.rb', line 32

def subscribe tube, message_headers={}
  @connection.watch(tube)
end

#unreceive(message, message_headers = {}) ⇒ Object



58
59
60
# File 'lib/activemessaging/adapters/beanstalk.rb', line 58

def unreceive message, message_headers={}
  message.release
end

#unsubscribe(tube, message_headers = {}) ⇒ Object



36
37
38
# File 'lib/activemessaging/adapters/beanstalk.rb', line 36

def unsubscribe tube, message_headers={}
  @connection.ignore(tube)
end