Module: Angelo::Stash

Includes:
Celluloid::Internals::Logger
Included in:
SSE, Websocket
Defined in:
lib/angelo/stash.rb

Overview

utility class for stashing connected websockets in arbitrary contexts

Defined Under Namespace

Modules: ClassMethods Classes: SSE, Websocket

Instance Method Summary collapse

Instance Method Details

#<<(s) ⇒ Object

add a websocket to this context’s stash, save peeraddr info, start server handle_websocket task to read from the socket and fire events as needed



40
41
42
43
# File 'lib/angelo/stash.rb', line 40

def << s
  peeraddrs[s] = s.peeraddr
  stashes[@context] << s
end

#[](context) ⇒ Object

utility method to create a new instance with a different context



81
82
83
# File 'lib/angelo/stash.rb', line 81

def [] context
  self.class.new @server, context
end

#all_eachObject

iterate on every connected websocket in all contexts, mostly used for ping_websockets task



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/angelo/stash.rb', line 88

def all_each
  all_stashed = @mutex.synchronize { stashes.values.flatten }
  all_stashed.each do |s|
    begin
      yield s
    rescue Reel::SocketError, IOError, SystemCallError => e
      debug "all - #{e.message}"
      remove_socket s
      stashes.values.each {|_s| _s.delete s}
    end
  end
end

#each(&block) ⇒ Object

iterate on each connected websocket in this context, handling errors as needed



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/angelo/stash.rb', line 54

def each &block
  stash_dup = @mutex.synchronize { stash.dup }
  stash_dup.each do |s|
    begin
      yield s
    rescue Reel::SocketError, IOError, SystemCallError => e
      debug "context: #{@context} - #{e.message}"
      remove_socket s
    end
  end
  nil
end

#initialize(server, context = :default) ⇒ Object

create a new instance with a context, creating the array if needed



29
30
31
32
33
34
# File 'lib/angelo/stash.rb', line 29

def initialize server, context = :default
  raise ArgumentError.new "symbol required" unless Symbol === context
  @context, @server = context, server
  @mutex = Mutex.new
  stashes[@context] ||= []
end

#lengthObject

return the number of websockets in this context (some are potentially disconnected)



116
117
118
# File 'lib/angelo/stash.rb', line 116

def length
  stash.length
end

#peeraddr(s) ⇒ Object

access the peeraddr info for a given websocket



109
110
111
# File 'lib/angelo/stash.rb', line 109

def peeraddr s
  peeraddrs[s]
end

#peeraddrsObject



25
# File 'lib/angelo/stash.rb', line 25

def peeraddrs; self.class.peeraddrs; end

#reject!(&block) ⇒ Object

pass the given block to the underlying stashed array’s reject! method



103
104
105
# File 'lib/angelo/stash.rb', line 103

def reject! &block
  stash.reject! &block
end

#remove_socket(s) ⇒ Object

remove a socket from the stash, warn user, drop peeraddr info



69
70
71
72
73
74
75
76
77
# File 'lib/angelo/stash.rb', line 69

def remove_socket s
  s.close unless s.closed?
  if stash.include? s
    warn "removing socket from context ':#{@context}' (#{peeraddrs[s][2]})"
    stash.delete s

    peeraddrs.delete s
  end
end

#stashObject

access the underlying array of this context



47
48
49
# File 'lib/angelo/stash.rb', line 47

def stash
  stashes[@context]
end

#stashesObject



24
# File 'lib/angelo/stash.rb', line 24

def stashes; self.class.stashes; end