1
Fork 0

port to typescript

This commit is contained in:
Conduitry 2018-10-11 07:52:47 -04:00
parent 4405822cd9
commit c30f9b4db9
1 changed files with 80 additions and 84 deletions

View File

@ -1,139 +1,123 @@
import EventEmitter from 'events';
import * as EventEmitter from 'events';
import * as fs from 'fs';
import { promisify } from 'util';
const readdir = promisify(fs.readdir);
const stat = promisify(fs.stat);
const _dir = Symbol('_dir');
const _filter = Symbol('_filter');
const _watch = Symbol('_watch');
const _debounce = Symbol('_debounce');
const _watchers = Symbol('_watchers');
const _paths = Symbol('_paths');
const _timeouts = Symbol('_timeouts');
const _queue = Symbol('_queue');
const _isProcessing = Symbol('_isProcessing');
const _isInitStarted = Symbol('_isInitStarted');
const _isClosed = Symbol('_isClosed');
const _recurse = Symbol('_recurse');
const _handle = Symbol('_handle');
const _enqueue = Symbol('_enqueue');
export default class CheapWatch extends EventEmitter {
constructor({ dir, filter, watch = true, debounce = 10 }) {
if (typeof dir !== 'string') {
// root directory
dir: string;
// function to limit watching to certain directories/files
filter?: Filter;
// whether to actually watch for changes, or just report all matching files and their stats
watch = true;
// number of milliseconds to use to debounce events from FSWatcher
debounce = 10;
// paths of all files/dirs -> stats
paths = new Map<string, fs.Stats>();
// paths of all directories -> FSWatcher instances
private _watchers = new Map<string, fs.FSWatcher>();
// paths of files/dirs with pending debounced events -> setTimeout timer ids
private _timeouts = new Map<string, NodeJS.Timer>();
// queue of paths of pending FSWatcher events to handle
private _queue: string[] = [];
// current status of instance
private _status = Status.Created;
constructor(data: object /* = { dir, filter, watch, debounce } */) {
super();
Object.assign(this, data);
if (typeof this.dir !== 'string') {
throw new TypeError('dir must be a string');
}
if (filter && typeof filter !== 'function') {
if (this.filter && typeof this.filter !== 'function') {
throw new TypeError('filter must be a function');
}
if (typeof watch !== 'boolean') {
if (typeof this.watch !== 'boolean') {
throw new TypeError('watch must be a boolean');
}
if (typeof debounce !== 'number') {
if (typeof this.debounce !== 'number') {
throw new TypeError('debounce must be a number');
}
super();
// root directory
this[_dir] = dir;
// (optional) function to limit watching to certain directories/files
this[_filter] = filter;
// (optional) whether to actually watch for changes, or just report all matching files and their stats
this[_watch] = watch;
// (optional) number of milliseconds to use to debounce events from FSWatcher
this[_debounce] = debounce;
// paths of all directories -> FSWatcher instances
this[_watchers] = new Map();
// paths of all files/dirs -> stats
this[_paths] = new Map();
// paths of files/dirs with pending debounced events -> setTimeout timer ids
this[_timeouts] = new Map();
// queue of pending FSWatcher events to handle
this[_queue] = [];
// whether some FSWatcher event is currently already in the process of being handled
this[_isProcessing] = false;
// whether init has been called
this[_isInitStarted] = false;
// whether close has been called
this[_isClosed] = false;
}
// recurse directory, get stats, set up FSWatcher instances
async init() {
if (this[_isInitStarted]) {
async init(): Promise<void> {
if (this._status !== Status.Created) {
throw new Error('cannot call init() twice');
}
this[_isInitStarted] = true;
await this[_recurse](this[_dir]);
this.paths = this[_paths];
this._status = Status.Initing;
await this._recurse(this.dir);
this._status = Status.Ready;
}
// close all FSWatchers
close() {
if (!this.paths) {
close(): void {
if (this._status === Status.Created || this._status === Status.Initing) {
throw new Error('cannot call close() before init() finishes');
}
if (this[_isClosed]) {
if (this._status === Status.Closed) {
throw new Error('cannot call close() twice');
}
this[_isClosed] = true;
for (const watcher of this[_watchers].values()) {
this._status = Status.Closed;
for (const watcher of this._watchers.values()) {
watcher.close();
}
}
// recurse a given directory
async [_recurse](full) {
const path = full.slice(this[_dir].length + 1);
private async _recurse(full: string): Promise<void> {
const path = full.slice(this.dir.length + 1);
const stats = await stat(full);
if (path) {
if (this[_filter] && !await this[_filter]({ path, stats })) {
if (this.filter && !(await this.filter({ path, stats }))) {
return;
}
this[_paths].set(path, stats);
this.paths.set(path, stats);
}
if (stats.isDirectory()) {
if (this[_watch]) {
this[_watchers].set(
if (this.watch) {
this._watchers.set(
path,
fs.watch(full, this[_handle].bind(this, full)).on('error', () => {}),
fs.watch(full, this._handle.bind(this, full)).on('error', () => {}),
);
}
await Promise.all(
(await readdir(full)).map(sub => this[_recurse](full + '/' + sub)),
(await readdir(full)).map(sub => this._recurse(full + '/' + sub)),
);
}
}
// handle FSWatcher event for given directory
[_handle](dir, event, file) {
private _handle(dir: string, event: Event, file: string): void {
const full = dir + '/' + file;
if (this[_timeouts].has(full)) {
clearTimeout(this[_timeouts].get(full));
if (this._timeouts.has(full)) {
clearTimeout(this._timeouts.get(full));
}
this[_timeouts].set(
this._timeouts.set(
full,
setTimeout(() => {
this[_timeouts].delete(full);
this[_enqueue](full);
}, this[_debounce]),
this._timeouts.delete(full);
this._enqueue(full);
}, this.debounce),
);
}
// add an FSWatcher event to the queue, and handle queued events
async [_enqueue](full) {
this[_queue].push(full);
if (this[_isProcessing] || !this[_paths]) {
private async _enqueue(full: string): Promise<void> {
this._queue.push(full);
if (this._status !== Status.Ready) {
return;
}
this[_isProcessing] = true;
while (this[_queue].length) {
const full = this[_queue].shift();
const path = full.slice(this[_dir].length + 1);
this._status = Status.Processing;
while (this._queue.length) {
const full = this._queue.shift();
const path = full.slice(this.dir.length + 1);
const stats = await stat(full).catch(() => {});
if (stats) {
if (this[_filter] && !await this[_filter]({ path, stats })) {
if (this.filter && !(await this.filter({ path, stats }))) {
continue;
}
const isNew = !this.paths.has(path);
@ -141,10 +125,10 @@ export default class CheapWatch extends EventEmitter {
if (path) {
this.emit('+', { path, stats, isNew });
}
if (stats.isDirectory() && !this[_watchers].has(path)) {
if (stats.isDirectory() && !this._watchers.has(path)) {
// note the new directory
// start watching it, and report any files in it
await this[_recurse](full);
await this._recurse(full);
for (const [newPath, stats] of this.paths.entries()) {
if (newPath.startsWith(path + '/')) {
this.emit('+', { path: newPath, stats, isNew: true });
@ -156,12 +140,12 @@ export default class CheapWatch extends EventEmitter {
const stats = this.paths.get(path);
this.paths.delete(path);
this.emit('-', { path, stats });
if (this[_watchers].has(path)) {
if (this._watchers.has(path)) {
// stop watching it, and report any files/dirs that were in it
for (const old of this[_watchers].keys()) {
for (const old of this._watchers.keys()) {
if (old === path || old.startsWith(path + '/')) {
this[_watchers].get(old).close();
this[_watchers].delete(old);
this._watchers.get(old).close();
this._watchers.delete(old);
}
}
for (const old of this.paths.keys()) {
@ -174,6 +158,18 @@ export default class CheapWatch extends EventEmitter {
}
}
}
this[_isProcessing] = false;
this._status = Status.Ready;
}
}
interface Filter {
(file: { path: string; stats: fs.Stats }): Promise<boolean>;
}
const enum Status {
Created,
Initing,
Ready,
Processing,
Closed,
}