Class: Agent::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/agent/channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Channel

Returns a new instance of Channel.



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/agent/channel.rb', line 17

def initialize(*args)
  opts          = args.last.is_a?(Hash) ? args.pop : {}
  @type         = args.shift
  @max          = args.shift  || 0
  @closed       = false
  @name         = opts[:name] || UUID.generate
  @direction    = opts[:direction] || :bidirectional
  @skip_marshal = opts[:skip_marshal]
  @close_mutex  = Mutex.new
  @queue        = Queues.register(@name, @type, @max)
end

Instance Attribute Details

#directionObject (readonly)

Returns the value of attribute direction.



15
16
17
# File 'lib/agent/channel.rb', line 15

def direction
  @direction
end

#maxObject (readonly)

Returns the value of attribute max.



15
16
17
# File 'lib/agent/channel.rb', line 15

def max
  @max
end

#nameObject (readonly)

Returns the value of attribute name.



15
16
17
# File 'lib/agent/channel.rb', line 15

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



15
16
17
# File 'lib/agent/channel.rb', line 15

def queue
  @queue
end

#typeObject (readonly)

Returns the value of attribute type.



15
16
17
# File 'lib/agent/channel.rb', line 15

def type
  @type
end

Instance Method Details

#as_receive_onlyObject



100
101
102
# File 'lib/agent/channel.rb', line 100

def as_receive_only
  as_direction_only(:receive)
end

#as_send_onlyObject



96
97
98
# File 'lib/agent/channel.rb', line 96

def as_send_only
  as_direction_only(:send)
end

#closeObject

Closing methods



77
78
79
80
81
82
83
84
85
# File 'lib/agent/channel.rb', line 77

def close
  @close_mutex.synchronize do
    raise Errors::ChannelClosed if @closed
    @closed = true
    @queue.close
    @queue = nil
    Queues.delete(@name)
  end
end

#closed?Boolean

Returns:

  • (Boolean)


86
# File 'lib/agent/channel.rb', line 86

def closed?; @closed; end

#marshal_dumpObject



39
40
41
# File 'lib/agent/channel.rb', line 39

def marshal_dump
  [@closed, @name, @max, @type, @direction]
end

#marshal_load(ary) ⇒ Object

Serialization methods



32
33
34
35
36
37
# File 'lib/agent/channel.rb', line 32

def marshal_load(ary)
  @closed, @name, @max, @type, @direction = *ary
  @queue = Queues[@name]
  @closed = @queue.nil? || @queue.closed?
  self
end

#open?Boolean

Returns:

  • (Boolean)


87
# File 'lib/agent/channel.rb', line 87

def open?;   !@closed;   end

#pop?Boolean Also known as: receive?

Returns:

  • (Boolean)


71
# File 'lib/agent/channel.rb', line 71

def pop?; queue.pop?; end

#push?Boolean Also known as: send?

Returns:

  • (Boolean)


55
# File 'lib/agent/channel.rb', line 55

def push?; queue.push?; end

#receive(options = {}) ⇒ Object Also known as: pop

Receiving methods



61
62
63
64
65
66
67
68
# File 'lib/agent/channel.rb', line 61

def receive(options={})
  check_direction(:receive)
  q = queue
  return q.pop(options) if q
  pop = Pop.new(options)
  pop.close
  [pop.object, false]
end

#remove_operations(operations) ⇒ Object



89
90
91
92
93
94
# File 'lib/agent/channel.rb', line 89

def remove_operations(operations)
  # ugly, but it overcomes the race condition without synchronization
  # since instance variable access is atomic.
  q = @queue
  q.remove_operations(operations) if q
end

#send(object, options = {}) ⇒ Object Also known as: push, <<

Sending methods



46
47
48
49
50
51
# File 'lib/agent/channel.rb', line 46

def send(object, options={})
  check_direction(:send)
  q = queue
  raise Errors::ChannelClosed unless q
  q.push(object, {:skip_marshal => @skip_marshal}.merge(options))
end