Class: SMQueue::SpreadAdapter

Inherits:
Adapter show all
Defined in:
lib/smqueue/adapters/spread.rb

Defined Under Namespace

Classes: Configuration

Instance Method Summary collapse

Methods inherited from Adapter

#close, create, #open

Methods inherited from Doodle

#to_hash

Instance Method Details

#connectObject



56
57
58
59
60
61
# File 'lib/smqueue/adapters/spread.rb', line 56

def connect
  @connection = Spread::Connection.new(configuration.channel, configuration.private_name, configuration.all_messages )
  connection.join(configuration.group)
  configuration.private_name = connection.private_group
  connected true
end

#disconnectObject



62
63
64
65
66
# File 'lib/smqueue/adapters/spread.rb', line 62

def disconnect
  connection.leave group
  connection.disconnect
  connected false
end

#get(&block) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/smqueue/adapters/spread.rb', line 67

def get(&block)
  m = nil
  connect if !connected
  loop do
    msg = connection.receive
    if msg.data?
      m = SMQueue::Message.new(
                           :headers => {
                             :private_name => configuration.private_name,
                             :sender => msg.sender,
                             :type => msg.msg_type,
                             :groups => msg.groups,
                             :reliable => msg.reliable?,
                             :safe => msg.safe?,
                             :agreed => msg.agreed?,
                             :causal => msg.causal?,
                             :fifo => msg.fifo?,
                           },
                           :body => msg.message
                           )
      if block_given?
        yield(m)
      else
        break
      end
    end
  end
  m
end

#put(msg) ⇒ Object



96
97
98
99
# File 'lib/smqueue/adapters/spread.rb', line 96

def put(msg)
  connect if !connected
  connection.multicast(msg, configuration.group, configuration.service_type, msg_type = 0, self_discard = true)
end