Class: Listen::Listener
- Inherits:
-
Object
- Object
- Listen::Listener
- Includes:
- Celluloid::FSM, QueueOptimizer
- Defined in:
- lib/listen/listener.rb
Instance Attribute Summary collapse
-
#block ⇒ Object
Returns the value of attribute block.
-
#directories ⇒ Object
readonly
TODO: deprecate.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#options ⇒ Object
readonly
TODO: deprecate.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
-
#silencer ⇒ Object
readonly
Returns the value of attribute silencer.
-
#supervisor ⇒ Object
readonly
Returns the value of attribute supervisor.
-
#wait_thread ⇒ Object
readonly
private
Returns the value of attribute wait_thread.
Instance Method Summary collapse
- #_adapter_class ⇒ Object private
- #_debug_level ⇒ Object private
- #_init_actors ⇒ Object private
- #_init_options(options = {}) ⇒ Object private
- #_init_tcp_options(target) ⇒ Object private
- #_log(type, message) ⇒ Object private
-
#_process_changes ⇒ Object
private
for easier testing without sleep loop.
- #_queue_raw_change(type, dir, rel_path, options) ⇒ Object private
- #_reconfigure_silencer(extra_options) ⇒ Object private
- #_silenced?(path, type) ⇒ Boolean private
- #_start_adapter ⇒ Object private
- #_start_wait_thread ⇒ Object private
- #_stop_wait_thread ⇒ Object private
- #_wait_for_changes ⇒ Object private
- #_wakeup_wait_thread ⇒ Object private
- #async(type) ⇒ Object
-
#ignore(regexps) ⇒ Object
Add files and dirs to ignore on top of defaults.
-
#ignore!(regexps) ⇒ Object
Replace default ignore patterns with provided regexp.
-
#initialize(*args) {|modified, added, removed| ... } ⇒ Listener
constructor
Initializes the directories listener.
-
#only(regexps) ⇒ Object
Listen only to files and dirs matching regexp.
-
#pause ⇒ Object
Stops invoking callbacks (messages pile up).
-
#paused=(value) ⇒ Object
TODO: deprecate.
- #paused? ⇒ Boolean (also: #paused)
-
#processing? ⇒ Boolean
(also: #listen?)
processing means callbacks are called.
- #queue(type, change, dir, path, options = {}) ⇒ Object
-
#start ⇒ Object
(also: #unpause)
Starts processing events and starts adapters or resumes invoking callbacks if paused.
-
#stop ⇒ Object
Stops processing and terminates all actors.
- #sync(type) ⇒ Object
Methods included from QueueOptimizer
#_calculate_add_remove_difference, #_detect_possible_editor_save, #_logical_action_for, #_reinterpret_related_changes, #_smoosh_changes, #_squash_changes
Constructor Details
#initialize(*args) {|modified, added, removed| ... } ⇒ Listener
Initializes the directories listener.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/listen/listener.rb', line 33 def initialize(*args, &block) = (args.last.is_a?(Hash) ? args.pop : {}) # Setup logging first Celluloid.logger.level = _debug_level _log :info, "Celluloid loglevel set to: #{Celluloid.logger.level}" @silencer = Silencer.new _reconfigure_silencer({}) @tcp_mode = nil if [:recipient, :broadcaster].include? args[1] target = args.shift @tcp_mode = args.shift (target) end @directories = args.flatten.map { |path| Pathname.new(path).realpath } @queue = Queue.new @block = block @registry = Celluloid::Registry.new transition :stopped end |
Instance Attribute Details
#block ⇒ Object
Returns the value of attribute block.
14 15 16 |
# File 'lib/listen/listener.rb', line 14 def block @block end |
#directories ⇒ Object (readonly)
TODO: deprecate
19 20 21 |
# File 'lib/listen/listener.rb', line 19 def directories @directories end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
21 22 23 |
# File 'lib/listen/listener.rb', line 21 def host @host end |
#options ⇒ Object (readonly)
TODO: deprecate
19 20 21 |
# File 'lib/listen/listener.rb', line 19 def end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
21 22 23 |
# File 'lib/listen/listener.rb', line 21 def port @port end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
20 21 22 |
# File 'lib/listen/listener.rb', line 20 def registry @registry end |
#silencer ⇒ Object (readonly)
Returns the value of attribute silencer.
16 17 18 |
# File 'lib/listen/listener.rb', line 16 def silencer @silencer end |
#supervisor ⇒ Object (readonly)
Returns the value of attribute supervisor.
20 21 22 |
# File 'lib/listen/listener.rb', line 20 def supervisor @supervisor end |
#wait_thread ⇒ Object (readonly, private)
Returns the value of attribute wait_thread.
280 281 282 |
# File 'lib/listen/listener.rb', line 280 def wait_thread @wait_thread end |
Instance Method Details
#_adapter_class ⇒ Object (private)
254 255 256 |
# File 'lib/listen/listener.rb', line 254 def _adapter_class @adapter_class ||= Adapter.select() end |
#_debug_level ⇒ Object (private)
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/listen/listener.rb', line 178 def _debug_level # TODO: remove? (since there are BSD warnings anyway) bsd = RbConfig::CONFIG['host_os'] =~ /bsd|dragonfly/ return Logger::DEBUG if bsd debugging = ENV['LISTEN_GEM_DEBUGGING'] || [:debug] case debugging.to_s when /2/ Logger::DEBUG when /true|yes|1/i Logger::INFO else Logger::ERROR end end |
#_init_actors ⇒ Object (private)
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/listen/listener.rb', line 194 def _init_actors @supervisor = Celluloid::SupervisionGroup.run!(registry) supervisor.add(Record, as: :record, args: self) supervisor.pool(Change, as: :change_pool, args: self) if @tcp_mode == :broadcaster require 'listen/tcp/broadcaster' supervisor.add(TCP::Broadcaster, as: :broadcaster, args: [@host, @port]) # TODO: should be auto started, because if it crashes # a new instance is spawned by supervisor, but it's 'start' isn't # called registry[:broadcaster].start end = [mq: self, directories: directories] supervisor.add(_adapter_class, as: :adapter, args: ) end |
#_init_options(options = {}) ⇒ Object (private)
170 171 172 173 174 175 176 |
# File 'lib/listen/listener.rb', line 170 def ( = {}) { debug: false, latency: nil, wait_for_delay: 0.1, force_polling: false, polling_fallback_message: nil }.merge() end |
#_init_tcp_options(target) ⇒ Object (private)
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/listen/listener.rb', line 282 def (target) # Handle TCP options here require 'listen/tcp' fail ArgumentError, 'missing host/port for TCP' unless target if @tcp_mode == :recipient @host = 'localhost' [:force_tcp] = true end if target.is_a? Fixnum @port = target else @host, port = target.split(':') @port = port.to_i end end |
#_log(type, message) ⇒ Object (private)
250 251 252 |
# File 'lib/listen/listener.rb', line 250 def _log(type, ) Celluloid.logger.send(type, ) end |
#_process_changes ⇒ Object (private)
for easier testing without sleep loop
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/listen/listener.rb', line 259 def _process_changes return if @queue.empty? @last_queue_event_time = nil changes = [] while !@queue.empty? changes << @queue.pop end return if block.nil? hash = _smoosh_changes(changes) result = [hash[:modified], hash[:added], hash[:removed]] block_start = Time.now.to_f # TODO: condition not tested, but too complex to test ATM block.call(*result) unless result.all?(&:empty?) _log :debug, "Callback took #{Time.now.to_f - block_start} seconds" end |
#_queue_raw_change(type, dir, rel_path, options) ⇒ Object (private)
328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/listen/listener.rb', line 328 def _queue_raw_change(type, dir, rel_path, ) _log :debug, "raw queue: #{[type, dir, rel_path, options].inspect}" unless (worker = async(:change_pool)) _log :warn, 'Failed to allocate worker from change pool' return end worker.change(type, dir, rel_path, ) rescue RuntimeError _log :error, "#{__method__} crashed: #{$!}:#{[email protected]("\n")}" raise end |
#_reconfigure_silencer(extra_options) ⇒ Object (private)
300 301 302 303 304 305 306 307 308 309 |
# File 'lib/listen/listener.rb', line 300 def _reconfigure_silencer() .merge!() # TODO: this should be directory specific rules = [:only, :ignore, :ignore!].map do |option| [option, [option]] if .key? option end @silencer.configure(Hash[rules.compact]) end |
#_silenced?(path, type) ⇒ Boolean (private)
240 241 242 |
# File 'lib/listen/listener.rb', line 240 def _silenced?(path, type) @silencer.silenced?(path, type) end |
#_start_adapter ⇒ Object (private)
244 245 246 247 248 |
# File 'lib/listen/listener.rb', line 244 def _start_adapter # Don't run async, because configuration has to finish first adapter = sync(:adapter) adapter.start end |
#_start_wait_thread ⇒ Object (private)
311 312 313 |
# File 'lib/listen/listener.rb', line 311 def _start_wait_thread @wait_thread = Thread.new { _wait_for_changes } end |
#_stop_wait_thread ⇒ Object (private)
319 320 321 322 323 324 325 326 |
# File 'lib/listen/listener.rb', line 319 def _stop_wait_thread return unless wait_thread if wait_thread.alive? wait_thread.wakeup wait_thread.join end @wait_thread = nil end |
#_wait_for_changes ⇒ Object (private)
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/listen/listener.rb', line 213 def _wait_for_changes latency = [:wait_for_delay] loop do break if state == :stopped if state == :paused || @queue.empty? sleep break if state == :stopped end # Assure there's at least latency between callbacks to allow # for accumulating changes now = Time.now.to_f diff = latency + (@last_queue_event_time || now) - now if diff > 0 sleep diff next end _process_changes unless state == :paused end rescue RuntimeError Kernel.warn "[Listen warning]: Change block raised an exception: #{$!}" Kernel.warn "Backtrace:\n\t#{[email protected]("\n\t")}" end |
#_wakeup_wait_thread ⇒ Object (private)
315 316 317 |
# File 'lib/listen/listener.rb', line 315 def _wakeup_wait_thread wait_thread.wakeup if wait_thread && wait_thread.alive? end |
#async(type) ⇒ Object
144 145 146 147 |
# File 'lib/listen/listener.rb', line 144 def async(type) proxy = sync(type) proxy ? proxy.async : nil end |
#ignore(regexps) ⇒ Object
Add files and dirs to ignore on top of defaults
(@see Listen::Silencer for default ignored files and dirs)
130 131 132 |
# File 'lib/listen/listener.rb', line 130 def ignore(regexps) _reconfigure_silencer(ignore: [[:ignore], regexps]) end |
#ignore!(regexps) ⇒ Object
Replace default ignore patterns with provided regexp
135 136 137 |
# File 'lib/listen/listener.rb', line 135 def ignore!(regexps) _reconfigure_silencer(ignore: [], ignore!: regexps) end |
#only(regexps) ⇒ Object
Listen only to files and dirs matching regexp
140 141 142 |
# File 'lib/listen/listener.rb', line 140 def only(regexps) _reconfigure_silencer(only: regexps) end |
#pause ⇒ Object
Stops invoking callbacks (messages pile up)
102 103 104 |
# File 'lib/listen/listener.rb', line 102 def pause transition :paused end |
#paused=(value) ⇒ Object
TODO: deprecate
119 120 121 |
# File 'lib/listen/listener.rb', line 119 def paused=(value) transition value ? :paused : :processing end |
#paused? ⇒ Boolean Also known as: paused
111 112 113 |
# File 'lib/listen/listener.rb', line 111 def paused? state == :paused end |
#processing? ⇒ Boolean Also known as: listen?
processing means callbacks are called
107 108 109 |
# File 'lib/listen/listener.rb', line 107 def processing? state == :processing end |
#queue(type, change, dir, path, options = {}) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/listen/listener.rb', line 153 def queue(type, change, dir, path, = {}) fail "Invalid type: #{type.inspect}" unless [:dir, :file].include? type fail "Invalid change: #{change.inspect}" unless change.is_a?(Symbol) fail "Invalid path: #{path.inspect}" unless path.is_a?(String) @queue << [type, change, dir, path, ] @last_queue_event_time = Time.now.to_f _wakeup_wait_thread unless state == :paused return unless @tcp_mode == :broadcaster = TCP::Message.new(type, change, dir, path, ) registry[:broadcaster].async.broadcast(.payload) end |
#start ⇒ Object Also known as: unpause
Starts processing events and starts adapters or resumes invoking callbacks if paused
89 90 91 |
# File 'lib/listen/listener.rb', line 89 def start transition :processing end |
#stop ⇒ Object
Stops processing and terminates all actors
97 98 99 |
# File 'lib/listen/listener.rb', line 97 def stop transition :stopped end |
#sync(type) ⇒ Object
149 150 151 |
# File 'lib/listen/listener.rb', line 149 def sync(type) @registry[type] end |