Class: Listen::Listener

Inherits:
Object
  • Object
show all
Includes:
Celluloid::FSM, QueueOptimizer
Defined in:
lib/listen/listener.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QueueOptimizer

#_calculate_add_remove_difference, #_detect_possible_editor_save, #_logical_action_for, #_reinterpret_related_changes, #_smoosh_changes, #_squash_changes

Constructor Details

#initialize(*args) {|modified, added, removed| ... } ⇒ Listener

Initializes the directories listener.

Parameters:

  • directory (String)

    the directories to listen to

  • options (Hash)

    the listen options (see Listen::Listener::Options)

Yields:

  • (modified, added, removed)

    the changed files

Yield Parameters:

  • modified (Array<String>)

    the list of modified files

  • added (Array<String>)

    the list of added files

  • removed (Array<String>)

    the list of removed files



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/listen/listener.rb', line 33

def initialize(*args, &block)
  @options = _init_options(args.last.is_a?(Hash) ? args.pop : {})

  # Setup logging first
  Celluloid.logger.level = _debug_level
  _log :info, "Celluloid loglevel set to: #{Celluloid.logger.level}"

  @silencer = Silencer.new
  _reconfigure_silencer({})

  @tcp_mode = nil
  if [:recipient, :broadcaster].include? args[1]
    target = args.shift
    @tcp_mode = args.shift
    _init_tcp_options(target)
  end

  @directories = args.flatten.map { |path| Pathname.new(path).realpath }
  @queue = Queue.new
  @block = block
  @registry = Celluloid::Registry.new

  transition :stopped
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



14
15
16
# File 'lib/listen/listener.rb', line 14

def block
  @block
end

#directoriesObject (readonly)

TODO: deprecate



19
20
21
# File 'lib/listen/listener.rb', line 19

def directories
  @directories
end

#hostObject (readonly)

Returns the value of attribute host.



21
22
23
# File 'lib/listen/listener.rb', line 21

def host
  @host
end

#optionsObject (readonly)

TODO: deprecate



19
20
21
# File 'lib/listen/listener.rb', line 19

def options
  @options
end

#portObject (readonly)

Returns the value of attribute port.



21
22
23
# File 'lib/listen/listener.rb', line 21

def port
  @port
end

#registryObject (readonly)

Returns the value of attribute registry.



20
21
22
# File 'lib/listen/listener.rb', line 20

def registry
  @registry
end

#silencerObject (readonly)

Returns the value of attribute silencer.



16
17
18
# File 'lib/listen/listener.rb', line 16

def silencer
  @silencer
end

#supervisorObject (readonly)

Returns the value of attribute supervisor.



20
21
22
# File 'lib/listen/listener.rb', line 20

def supervisor
  @supervisor
end

#wait_threadObject (readonly, private)

Returns the value of attribute wait_thread.



280
281
282
# File 'lib/listen/listener.rb', line 280

def wait_thread
  @wait_thread
end

Instance Method Details

#_adapter_classObject (private)



254
255
256
# File 'lib/listen/listener.rb', line 254

def _adapter_class
  @adapter_class ||= Adapter.select(options)
end

#_debug_levelObject (private)



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/listen/listener.rb', line 178

def _debug_level
  # TODO: remove? (since there are BSD warnings anyway)
  bsd = RbConfig::CONFIG['host_os'] =~ /bsd|dragonfly/
  return Logger::DEBUG if bsd

  debugging = ENV['LISTEN_GEM_DEBUGGING'] || options[:debug]
  case debugging.to_s
  when /2/
    Logger::DEBUG
  when /true|yes|1/i
    Logger::INFO
  else
    Logger::ERROR
  end
end

#_init_actorsObject (private)



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/listen/listener.rb', line 194

def _init_actors
  @supervisor = Celluloid::SupervisionGroup.run!(registry)
  supervisor.add(Record, as: :record, args: self)
  supervisor.pool(Change, as: :change_pool, args: self)

  if @tcp_mode == :broadcaster
    require 'listen/tcp/broadcaster'
    supervisor.add(TCP::Broadcaster, as: :broadcaster, args: [@host, @port])

    # TODO: should be auto started, because if it crashes
    # a new instance is spawned by supervisor, but it's 'start' isn't
    # called
    registry[:broadcaster].start
  end

  options = [mq: self, directories: directories]
  supervisor.add(_adapter_class, as: :adapter, args: options)
end

#_init_options(options = {}) ⇒ Object (private)



170
171
172
173
174
175
176
# File 'lib/listen/listener.rb', line 170

def _init_options(options = {})
  { debug: false,
    latency: nil,
    wait_for_delay: 0.1,
    force_polling: false,
    polling_fallback_message: nil }.merge(options)
end

#_init_tcp_options(target) ⇒ Object (private)



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/listen/listener.rb', line 282

def _init_tcp_options(target)
  # Handle TCP options here
  require 'listen/tcp'
  fail ArgumentError, 'missing host/port for TCP' unless target

  if @tcp_mode == :recipient
    @host = 'localhost'
    @options[:force_tcp] = true
  end

  if target.is_a? Fixnum
    @port = target
  else
    @host, port = target.split(':')
    @port = port.to_i
  end
end

#_log(type, message) ⇒ Object (private)



250
251
252
# File 'lib/listen/listener.rb', line 250

def _log(type, message)
  Celluloid.logger.send(type, message)
end

#_process_changesObject (private)

for easier testing without sleep loop



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/listen/listener.rb', line 259

def _process_changes
  return if @queue.empty?

  @last_queue_event_time = nil

  changes = []
  while !@queue.empty?
    changes << @queue.pop
  end

  return if block.nil?

  hash = _smoosh_changes(changes)
  result = [hash[:modified], hash[:added], hash[:removed]]

  block_start = Time.now.to_f
  # TODO: condition not tested, but too complex to test ATM
  block.call(*result) unless result.all?(&:empty?)
  _log :debug, "Callback took #{Time.now.to_f - block_start} seconds"
end

#_queue_raw_change(type, dir, rel_path, options) ⇒ Object (private)



328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/listen/listener.rb', line 328

def _queue_raw_change(type, dir, rel_path, options)
  _log :debug, "raw queue: #{[type, dir, rel_path, options].inspect}"

  unless (worker = async(:change_pool))
    _log :warn, 'Failed to allocate worker from change pool'
    return
  end

  worker.change(type, dir, rel_path, options)
rescue RuntimeError
  _log :error, "#{__method__} crashed: #{$!}:#{[email protected]("\n")}"
  raise
end

#_reconfigure_silencer(extra_options) ⇒ Object (private)



300
301
302
303
304
305
306
307
308
309
# File 'lib/listen/listener.rb', line 300

def _reconfigure_silencer(extra_options)
  @options.merge!(extra_options)

  # TODO: this should be directory specific
  rules = [:only, :ignore, :ignore!].map do |option|
    [option, @options[option]] if @options.key? option
  end

  @silencer.configure(Hash[rules.compact])
end

#_silenced?(path, type) ⇒ Boolean (private)

Returns:

  • (Boolean)


240
241
242
# File 'lib/listen/listener.rb', line 240

def _silenced?(path, type)
  @silencer.silenced?(path, type)
end

#_start_adapterObject (private)



244
245
246
247
248
# File 'lib/listen/listener.rb', line 244

def _start_adapter
  # Don't run async, because configuration has to finish first
  adapter = sync(:adapter)
  adapter.start
end

#_start_wait_threadObject (private)



311
312
313
# File 'lib/listen/listener.rb', line 311

def _start_wait_thread
  @wait_thread = Thread.new { _wait_for_changes }
end

#_stop_wait_threadObject (private)



319
320
321
322
323
324
325
326
# File 'lib/listen/listener.rb', line 319

def _stop_wait_thread
  return unless wait_thread
  if wait_thread.alive?
    wait_thread.wakeup
    wait_thread.join
  end
  @wait_thread = nil
end

#_wait_for_changesObject (private)



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/listen/listener.rb', line 213

def _wait_for_changes
  latency = options[:wait_for_delay]

  loop do
    break if state == :stopped

    if state == :paused || @queue.empty?
      sleep
      break if state == :stopped
    end

    # Assure there's at least latency between callbacks to allow
    # for accumulating changes
    now = Time.now.to_f
    diff = latency + (@last_queue_event_time || now) - now
    if diff > 0
      sleep diff
      next
    end

    _process_changes unless state == :paused
  end
rescue RuntimeError
  Kernel.warn "[Listen warning]: Change block raised an exception: #{$!}"
  Kernel.warn "Backtrace:\n\t#{[email protected]("\n\t")}"
end

#_wakeup_wait_threadObject (private)



315
316
317
# File 'lib/listen/listener.rb', line 315

def _wakeup_wait_thread
  wait_thread.wakeup if wait_thread && wait_thread.alive?
end

#async(type) ⇒ Object



144
145
146
147
# File 'lib/listen/listener.rb', line 144

def async(type)
  proxy = sync(type)
  proxy ? proxy.async : nil
end

#ignore(regexps) ⇒ Object

Add files and dirs to ignore on top of defaults

(@see Listen::Silencer for default ignored files and dirs)



130
131
132
# File 'lib/listen/listener.rb', line 130

def ignore(regexps)
  _reconfigure_silencer(ignore: [options[:ignore], regexps])
end

#ignore!(regexps) ⇒ Object

Replace default ignore patterns with provided regexp



135
136
137
# File 'lib/listen/listener.rb', line 135

def ignore!(regexps)
  _reconfigure_silencer(ignore: [], ignore!: regexps)
end

#only(regexps) ⇒ Object

Listen only to files and dirs matching regexp



140
141
142
# File 'lib/listen/listener.rb', line 140

def only(regexps)
  _reconfigure_silencer(only: regexps)
end

#pauseObject

Stops invoking callbacks (messages pile up)



102
103
104
# File 'lib/listen/listener.rb', line 102

def pause
  transition :paused
end

#paused=(value) ⇒ Object

TODO: deprecate



119
120
121
# File 'lib/listen/listener.rb', line 119

def paused=(value)
  transition value ? :paused : :processing
end

#paused?Boolean Also known as: paused

Returns:

  • (Boolean)


111
112
113
# File 'lib/listen/listener.rb', line 111

def paused?
  state == :paused
end

#processing?Boolean Also known as: listen?

processing means callbacks are called

Returns:

  • (Boolean)


107
108
109
# File 'lib/listen/listener.rb', line 107

def processing?
  state == :processing
end

#queue(type, change, dir, path, options = {}) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/listen/listener.rb', line 153

def queue(type, change, dir, path, options = {})
  fail "Invalid type: #{type.inspect}" unless [:dir, :file].include? type
  fail "Invalid change: #{change.inspect}" unless change.is_a?(Symbol)
  fail "Invalid path: #{path.inspect}" unless path.is_a?(String)
  @queue << [type, change, dir, path, options]

  @last_queue_event_time = Time.now.to_f
  _wakeup_wait_thread unless state == :paused

  return unless @tcp_mode == :broadcaster

  message = TCP::Message.new(type, change, dir, path, options)
  registry[:broadcaster].async.broadcast(message.payload)
end

#startObject Also known as: unpause

Starts processing events and starts adapters or resumes invoking callbacks if paused



89
90
91
# File 'lib/listen/listener.rb', line 89

def start
  transition :processing
end

#stopObject

Stops processing and terminates all actors



97
98
99
# File 'lib/listen/listener.rb', line 97

def stop
  transition :stopped
end

#sync(type) ⇒ Object



149
150
151
# File 'lib/listen/listener.rb', line 149

def sync(type)
  @registry[type]
end