feat: add fork support

This commit is contained in:
timothycarambat
2024-07-03 10:44:00 -07:00
parent b3e611503c
commit 33c8e80625
15 changed files with 393 additions and 315 deletions
-31
View File
@@ -1,31 +0,0 @@
on:
- push
- pull_request
jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os:
- ubuntu-latest
- macos-latest
# TODO: re-enable windows
#- windows-latest
node:
- 14
- 16
- 18
exclude:
- os: macos-latest
node: 14
name: Node ${{ matrix.node }} on ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node }}
- name: Install dependencies
run: npm install
- name: Run tests
run: npm run test-coverage
+1 -1
View File
@@ -1,4 +1,4 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"
npx --no-install lint-staged && npm test
# npx --no-install lint-staged && npm test
-1
View File
@@ -1 +0,0 @@
jobscheduler.net
+1 -1
View File
@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2020 Nick Baugh <niftylettuce@gmail.com> (http://niftylettuce.com/)
Copyright (c) 2024 Mintplex Labs Inc <team@mintplexlabs.com> (https://mintplexlabs.com/)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
+50 -94
View File
@@ -1,21 +1,13 @@
<h1 align="center">
<a href="https://jobscheduler.net"><img src="https://d1i8ikybhfrv4r.cloudfront.net/bree/bree.png" alt="bree" /></a>
</h1>
<div align="center">
<a href="https://github.com/breejs/bree/actions/workflows/ci.yml"><img src="https://github.com/breejs/bree/actions/workflows/ci.yml/badge.svg" alt="build status" /></a>
<a href="https://github.com/sindresorhus/xo"><img src="https://img.shields.io/badge/code_style-XO-5ed9c7.svg" alt="code style" /></a>
<a href="https://github.com/prettier/prettier"><img src="https://img.shields.io/badge/styled_with-prettier-ff69b4.svg" alt="styled with prettier" /></a>
<a href="https://lass.js.org"><img src="https://img.shields.io/badge/made_with-lass-95CC28.svg" alt="made with lass" /></a>
<a href="LICENSE"><img src="https://img.shields.io/github/license/breejs/bree.svg" alt="license" /></a>
<a href="https://npm.im/bree"><img src="https://img.shields.io/npm/dt/bree.svg" alt="npm downloads" /></a>
</div>
<br />
> \[!IMPORTANT]\
> This is a customized fork of [BreeJS](https://github.com/breejs/bree) the allows use of multi-process support as well
> as worker threads. You should use the original project if you only need worker thread support.
<div align="center">
Bree is the best job scheduler for <a href="https://nodejs.org">Node.js</a> and JavaScript with <a href="https://en.wikipedia.org/wiki/Cron">cron</a>, <a href="https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date">dates</a>, <a href="https://github.com/vercel/ms">ms</a>, <a href="https://github.com/breejs/later">later</a>, and <a href="https://github.com/agenda/human-interval">human-friendly</a> support.
</div>
<hr />
<div align="center">
Works in Node v12.17.0+, uses <a href="https://nodejs.org/api/worker_threads.html">worker threads</a> (Node.js) to spawn sandboxed processes, and supports <a href="https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function">async/await</a>, <a href="https://github.com/sindresorhus/p-retry">retries</a>, <a href="https://github.com/sindresorhus/p-throttle">throttling</a>, <a href="#concurrency">concurrency</a>, and <a href="#cancellation-retries-stalled-jobs-and-graceful-reloading">cancelable jobs with graceful shutdown</a>. Simple, fast, and lightweight. <strong>Made for <a href="https://forwardemail.net">Forward Email</a> and <a href="https://lad.js.org">Lad</a></strong>.
Works in Node v16.0.0+, uses <a href="https://nodejs.org/api/worker_threads.html">worker threads</a> (Node.js) to spawn sandboxed processes, and supports <a href="https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function">async/await</a>, <a href="https://github.com/sindresorhus/p-retry">retries</a>, <a href="https://github.com/sindresorhus/p-throttle">throttling</a>, <a href="#concurrency">concurrency</a>, and <a href="#cancellation-retries-stalled-jobs-and-graceful-reloading">cancelable jobs with graceful shutdown</a>. Simple, fast, and lightweight. Added support to run specific jobs in <a href="https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options">forked processes</a> for memory & resource isolation.
</div>
@@ -45,7 +37,6 @@
* [Plugins](#plugins)
* [Available Plugins](#available-plugins)
* [Creating plugins for Bree](#creating-plugins-for-bree)
* [Real-world usage](#real-world-usage)
* [Contributors](#contributors)
* [License](#license)
@@ -56,23 +47,23 @@ Bree was created to give you fine-grained control with simplicity, and has built
We recommend you to query a persistent database in your jobs, to prevent specific operations from running more than once.
Bree does not force you to use an additional database layer of [Redis][] or [MongoDB][] to manage job state.
Bree does not force you to use an additional database layer of \[Redis]\[] or \[MongoDB]\[] to manage job state.
In doing so, you should manage boolean job states yourself using queries. For instance, if you have to send a welcome email to users, only send a welcome email to users that do not have a Date value set yet for `welcome_email_sent_at`.
## Install
[npm][]:
\[npm]\[]:
```sh
npm install bree
npm install @mintplex-labs/bree
```
[yarn][]:
\[yarn]\[]:
```sh
yarn add bree
yarn add @mintplex-labs/bree
```
@@ -89,7 +80,7 @@ To see details about upgrading from the last major version, please see [UPGRADIN
The example below assumes that you have a directory `jobs` in the root of the directory from which you run this example. For example, if the example below is at `/path/to/script.js`, then `/path/to/jobs/` must also exist as a directory. If you wish to disable this feature, then pass `root: false` as an option.
Inside this `jobs` directory are individual scripts which are run using [Workers][] per optional timeouts, and additionally, an optional interval or cron expression. The example below contains comments, which help to clarify how this works.
Inside this `jobs` directory are individual scripts which are run using \[Workers]\[] per optional timeouts, and additionally, an optional interval or cron expression. The example below contains comments, which help to clarify how this works.
The option `jobs` passed to a new instance of `Bree` (as shown below) is an Array. It contains values which can either be a String (name of a job in the `jobs` directory, which is run on boot) OR it can be an Object with `name`, `path`, `timeout`, and `interval` properties. If you do not supply a `path`, then the path is created using the root directory (defaults to `jobs`) in combination with the `name`. If you do not supply values for `timeout` and/nor `interval`, then these values are defaulted to `0` (which is the default for both, see [index.js](https://github.com/breejs/bree/blob/master/src/index.js) for more insight into configurable default options).
@@ -100,7 +91,7 @@ We have also documented all [Instance Options](#instance-options) and [Job Optio
```js
// app.mjs
import Bree from 'bree';
import Bree from '@mintplex-labs/bree';
const bree = new Bree({
// ... (see below) ...
@@ -128,7 +119,7 @@ const Graceful = require('@ladjs/graceful');
const Cabin = require('cabin');
// required
const Bree = require('bree');
const Bree = require('@mintplex-labs/bree');
//
// NOTE: see the "Instance Options" section below in this README
@@ -292,7 +283,14 @@ const bree = new Bree({
name: 'worker-16',
date: dayjs('1-1-2022', 'M-D-YYYY').toDate(),
cron: '0 0 1 * *'
}
},
// runs `./jobs/worker-13.js` in a new process as opposed to a worker thread
{
name: 'worker-13',
interval: '2m',
runAs: 'process',
},
]
});
@@ -368,25 +366,27 @@ Here is the full list of options and their defaults. See [src/index.js](https:/
| `outputWorkerMetadata` | Boolean | `false` | By default worker metadata is not passed to the second Object argument of `logger`. However if you set this to `true`, then `logger` will be invoked internally with two arguments (e.g. `logger.info('...', { worker: ... })`). This `worker` property contains `isMainThread` (Boolean), `resourceLimits` (Object), and `threadId` (String) properties; all of which correspond to [Workers][] metadata. This can be overridden on a per job basis. |
| `errorHandler` | Function | `null` | Set this function to receive a callback when an error is encountered during worker execution (e.g. throws an exception) or when it exits with non-zero code (e.g. `process.exit(1)`). The callback receives two parameters `error` and `workerMetadata`. Important note, when this callback is present default error logging will not be executed. |
| `workerMessageHandler` | Function | `null` | Set this function to receive a callback when a worker sends a message through [parentPort.postMessage](https://nodejs.org/docs/latest-v14.x/api/worker_threads.html#worker_threads_port_postmessage_value_transferlist). The callback receives at least two parameters `name` (of the worker) and `message` (coming from `postMessage`), if `outputWorkerMetadata` is enabled additional metadata will be sent to this handler. |
| `runJobsAs` | String | `worker` | Run all defined jobs as a `worker` using NodeJS worker threads or `process` to run in all new `forked` processes. |
## Job Options
See [Interval, Timeout, Date, and Cron Validate](#interval-timeout-date-and-cron-validation) below for more insight besides this table:
| Property | Type | Description |
| ---------------------- | ---------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `name` | String | The name of the job. This should match the base file path (e.g. `foo` if `foo.js` is located at `/path/to/jobs/foo.js`) unless `path` option is specified. A value of `index`, `index.js`, and `index.mjs` are reserved values and cannot be used here. |
| `path` | String | The path of the job or function used for spawning a new [Worker][workers] with. If not specified, then it defaults to the value for `name` plus the default file extension specified under [Instance Options](#instance-options). |
| `timeout` | Number, Object, String, or Boolean | Sets the duration in milliseconds before the job starts (it overrides the default inherited `timeout` as set in [Instance Options](#instance-options). A value of `0` indicates it will start immediately. This value can be a Number, String, or a Boolean of `false` (which indicates it will NOT inherit the default `timeout` from [Instance Options](#instance-options)). See [Job Interval and Timeout Values](#job-interval-and-timeout-values) below for more insight into how this value is parsed. |
| `interval` | Number, Object, or String | Sets the duration in milliseconds for the job to repeat itself, otherwise known as its interval (it overrides the default inherited `interval` as set in [Instance Options](#instance-options)). A value of `0` indicates it will not repeat and there will be no interval. If the value is greater than `0` then this value will be used as the interval. See [Job Interval and Timeout Values](#job-interval-and-timeout-values) below for more insight into how this value is parsed. |
| `date` | Date | This must be a valid JavaScript Date (we use `instance of Date` for comparison). If this value is in the past, then it is not run when jobs are started (or run manually). We recommend using [dayjs][] for creating this date, and then formatting it using the `toDate()` method (e.g. `dayjs().add('3, 'days').toDate()`). You could also use [moment][] or any other JavaScript date library, as long as you convert the value to a Date instance here. |
| `cron` | String | A cron expression to use as the job's interval, which is validated against [cron-validate][] and parsed by [later][]. |
| `hasSeconds` | Boolean | Overrides the [Instance Options](#instance-options) `hasSeconds` property if set. Note that setting this to `true` will automatically set `cronValidate` defaults to have `{ preset: 'default', override: { useSeconds: true } }` |
| `cronValidate` | Object | Overrides the [Instance Options](#instance-options) `cronValidate` property if set. |
| `closeWorkerAfterMs` | Number | Overrides the [Instance Options](#instance-options) `closeWorkerAfterMs` property if set. |
| `worker` | Object | Overrides the [Instance Options](#instance-options) `worker` property if set. |
| `outputWorkerMetadata` | Boolean | Overrides the [Instance Options](#instance-options) `outputWorkerMetadata` property if set. |
| Property | Type | Description | |
| ---------------------- | ---------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `name` | String | The name of the job. This should match the base file path (e.g. `foo` if `foo.js` is located at `/path/to/jobs/foo.js`) unless `path` option is specified. A value of `index`, `index.js`, and `index.mjs` are reserved values and cannot be used here. | |
| `path` | String | The path of the job or function used for spawning a new [Worker][workers] with. If not specified, then it defaults to the value for `name` plus the default file extension specified under [Instance Options](#instance-options). | |
| `timeout` | Number, Object, String, or Boolean | Sets the duration in milliseconds before the job starts (it overrides the default inherited `timeout` as set in [Instance Options](#instance-options). A value of `0` indicates it will start immediately. This value can be a Number, String, or a Boolean of `false` (which indicates it will NOT inherit the default `timeout` from [Instance Options](#instance-options)). See [Job Interval and Timeout Values](#job-interval-and-timeout-values) below for more insight into how this value is parsed. | |
| `interval` | Number, Object, or String | Sets the duration in milliseconds for the job to repeat itself, otherwise known as its interval (it overrides the default inherited `interval` as set in [Instance Options](#instance-options)). A value of `0` indicates it will not repeat and there will be no interval. If the value is greater than `0` then this value will be used as the interval. See [Job Interval and Timeout Values](#job-interval-and-timeout-values) below for more insight into how this value is parsed. | |
| `date` | Date | This must be a valid JavaScript Date (we use `instance of Date` for comparison). If this value is in the past, then it is not run when jobs are started (or run manually). We recommend using [dayjs][] for creating this date, and then formatting it using the `toDate()` method (e.g. `dayjs().add('3, 'days').toDate()`). You could also use [moment][] or any other JavaScript date library, as long as you convert the value to a Date instance here. | |
| `cron` | String | A cron expression to use as the job's interval, which is validated against [cron-validate][] and parsed by [later][]. | |
| `hasSeconds` | Boolean | Overrides the [Instance Options](#instance-options) `hasSeconds` property if set. Note that setting this to `true` will automatically set `cronValidate` defaults to have `{ preset: 'default', override: { useSeconds: true } }` | |
| `cronValidate` | Object | Overrides the [Instance Options](#instance-options) `cronValidate` property if set. | |
| `closeWorkerAfterMs` | Number | Overrides the [Instance Options](#instance-options) `closeWorkerAfterMs` property if set. | |
| `worker` | Object | Overrides the [Instance Options](#instance-options) `worker` property if set. | |
| `outputWorkerMetadata` | Boolean | Overrides the [Instance Options](#instance-options) `outputWorkerMetadata` property if set. | |
| `runAs` | String | `worker` | Run specific job as a `worker` using NodeJS worker threads or `process` to run in all new `forked` processes. Will be overridden by BreeConfig `runJobsAs` if set. |
## Job Interval and Timeout Values
@@ -394,8 +394,8 @@ See [Interval, Timeout, Date, and Cron Validate](#interval-timeout-date-and-cron
These values can include Number, Object, and String variable types:
* Number values indicates the number of milliseconds for the timeout or interval
* Object values must be a [later][] schedule object value (e.g. `later.parse.cron('15 10 * * ? *'))`)
* String values can be either a [later][], [human-interval][], or [ms][] String values (e.g. [later][] supports Strings such as `every 5 mins`, [human-interval][] supports Strings such as `3 days and 4 hours`, and [ms][] supports Strings such as `4h` for four hours)
* Object values must be a \[later]\[] schedule object value (e.g. `later.parse.cron('15 10 * * ? *'))`)
* String values can be either a \[later]\[], \[human-interval]\[], or \[ms]\[] String values (e.g. \[later]\[] supports Strings such as `every 5 mins`, \[human-interval]\[] supports Strings such as `3 days and 4 hours`, and \[ms]\[] supports Strings such as `4h` for four hours)
## Listening for events
@@ -457,7 +457,7 @@ new Bree({
## Cancellation, Retries, Stalled Jobs, and Graceful Reloading
We recommend that you listen for "cancel" event in your worker paths. Doing so will allow you to handle graceful cancellation of jobs. For example, you could use [p-cancelable][]
We recommend that you listen for "cancel" event in your worker paths. Doing so will allow you to handle graceful cancellation of jobs. For example, you could use \[p-cancelable]\[]
Here's a quick example of how to do that (e.g. `./jobs/some-worker.js`):
@@ -484,20 +484,20 @@ if (parentPort)
});
```
If you'd like jobs to retry, simply wrap your usage of promises with [p-retry][].
If you'd like jobs to retry, simply wrap your usage of promises with \[p-retry]\[].
We leave it up to you to have as much fine-grained control as you wish.
See [@ladjs/graceful][lad-graceful] for more insight into how this package works.
See \[@ladjs/graceful]\[lad-graceful] for more insight into how this package works.
## Interval, Timeout, Date, and Cron Validation
If you need help writing cron expressions, you can reference [crontab.guru](https://crontab.guru/).
We support [later][], [human-interval][], or [ms][] String values for both `timeout` and `interval`.
We support \[later]\[], \[human-interval]\[], or \[ms]\[] String values for both `timeout` and `interval`.
If you pass a `cron` property, then it is validated against [cron-validate][].
If you pass a `cron` property, then it is validated against \[cron-validate]\[].
You can pass a Date as the `date` property, but you cannot combine both `date` and `timeout`.
@@ -531,7 +531,7 @@ const ms = require('ms');
To close out the worker and signal that it is done, you can simply `parentPort.postMessage('done');` and/or `process.exit(0)`.
While writing your jobs (which will run in [worker][workers] threads), you should do one of the following:
While writing your jobs (which will run in \[worker]\[workers] threads), you should do one of the following:
* Signal to the main thread that the process has completed by sending a "done" message (per the example above in [Writing jobs with Promises and async-await](#writing-jobs-with-promises-and-async-await))
* Exit the process if there is NOT an error with code `0` (e.g. `process.exit(0);`)
@@ -552,9 +552,9 @@ As of v6.0.0 when you pass `closeWorkerAfterMs`, the timer will start once the w
## Complex timeouts and intervals
Since we use [later][], you can pass an instance of `later.parse.recur`, `later.parse.cron`, or `later.parse.text` as the `timeout` or `interval` property values (e.g. if you need to construct something manually).
Since we use \[later]\[], you can pass an instance of `later.parse.recur`, `later.parse.cron`, or `later.parse.text` as the `timeout` or `interval` property values (e.g. if you need to construct something manually).
You can also use [dayjs][] to construct dates (e.g. from now or a certain date) to millisecond differences using `dayjs().diff(new Date(), 'milliseconds')`. You would then pass that returned Number value as `timeout` or `interval` as needed.
You can also use \[dayjs]\[] to construct dates (e.g. from now or a certain date) to millisecond differences using `dayjs().diff(new Date(), 'milliseconds')`. You would then pass that returned Number value as `timeout` or `interval` as needed.
## Custom Worker Options
@@ -586,6 +586,7 @@ new Bree({
{
name: 'job with function',
path: someFunction
// runAs: property will always be "worker" when using direct function.
}
]
});
@@ -654,60 +655,15 @@ Plugins should be a function that recieves an `options` object and the `Bree` cl
```
## Real-world usage
More detailed examples can be found in [Forward Email][forward-email], [Lad][], and [Ghost][ghost].
## Contributors
| Name | Website |
| ---------------- | --------------------------------- |
| --------------------- | --------------------------------- |
| **Mintplex Labs Inc** | <https://mintplexlabs.com> |
| **Nick Baugh** | <http://niftylettuce.com/> |
| **shadowgate15** | <https://github.com/shadowgate15> |
## License
[MIT](LICENSE) © [Nick Baugh](http://niftylettuce.com/)
##
<a href="#"><img src="https://d1i8ikybhfrv4r.cloudfront.net/bree/footer.png" alt="#" /></a>
[ms]: https://github.com/vercel/ms
[human-interval]: https://github.com/agenda/human-interval
[npm]: https://www.npmjs.com/
[yarn]: https://yarnpkg.com/
[workers]: https://nodejs.org/api/worker_threads.html
[lad]: https://lad.js.org
[p-retry]: https://github.com/sindresorhus/p-retry
[p-cancelable]: https://github.com/sindresorhus/p-cancelable
[later]: https://breejs.github.io/later/parsers.html
[cron-validate]: https://github.com/Airfooox/cron-validate
[forward-email]: https://github.com/forwardemail/forwardemail.net
[dayjs]: https://github.com/iamkun/dayjs
[redis]: https://redis.io/
[mongodb]: https://www.mongodb.com/
[lad-graceful]: https://github.com/ladjs/graceful
[cabin]: https://cabinjs.com
[moment]: https://momentjs.com
[ghost]: https://ghost.org/
[MIT](LICENSE) © Mintplex Labs Inc
-81
View File
@@ -1,81 +0,0 @@
# Upgrading
## Upgrading from v8 to v9
**There are three major breaking changes:**
1. The usage of `bree.start()` and `bree.run()` methods must be changed to `await bree.start()` or `await bree.run()` (see the example below).
2. The usage of `bree.add()` must be changed to `await bree.add()` (since we now have asynchronous job validation and loading).
3. We have opted for `util.debuglog` as opposed to the userland `debug` package for debug logging. This means you must run `NODE_DEBUG=bree node app.js` as opposed to `DEBUG=bree node app.js`.
Here is a complete list of the underlying changes made:
* The method `start()` is now a Promise and you should either call `await bree.start()` or additionally call `await bree.init()` (an internal private method called by Bree) before attempting to start or use your Bree instance.
> CJS:
```diff
// if you're using CJS and you run such as `node app.js`
-bree.start();
+// async/await iif style
+(async () => {
+ await bree.start();
+})();
```
> ESM:
```diff
-bree.start();
+// leverage top-level await support (requires Node v14.8+)
+await bree.start();
```
* ESM module support has been added (per [#180](https://github.com/breejs/bree/issues/180)) by using dynamic imports to load the job Array (CJS is still supported).
* For a majority of users, you do not need to make any changes to your code for v9 to work when you upgrade from v8 (**with the exception of now having to do `await bree.start()`**).
* Top-level await support is added in Node v14.8+ (without requiring any Node flags), and therefore you can call `await bree.start();` (e.g. if your `package.json` has `"type": "module"` and/or the file extension you're running with Node is `.mjs`). Note that Bree still works in Node v12.17+
* The major difference is that Bree no longer initializes `this.config.jobs` in the constructor.
* However we have dummy-proofed this new approach, and `bree.init()` will be invoked (if and only if it has not yet been invoked successfully) when you call `bree.start()` (or any similar method that accesses `this.config.jobs` internally).
* Internal methods such as `validate` exported by `src/job-validator.js` are now asynchronous and return Promises (you do not need to worry about this unless you're doing something custom with these functions).
* The default `root` option will now attempt to resolve an absolute file path for an index since we are using dynamic imports. If you are using `index.mjs` (as opposed to `index.js` then you will need to set a value for the option `defaultRootIndex`). See <https://nodejs.org/api/esm.html#esm_mandatory_file_extensions> for more insight.
* The method `add()` is now a Promise (you should call `await bree.add(jobs)`.
* Several methods are now Promises in order to dummy-proof Bree for users that may not wish to call `await bree.init()` before calling `await bree.start()` (as per above).
* The method `run()` is now a Promise (**but you do not need to `await` it** if you already called `await bree.start()` or `await bree.init()` or any of the methods listed below).
* The method `stop()` is now a Promise (**but you do not need to `await` it** if you already called `await bree.start()` or `await bree.init()` or `await bree.run()` nor any of the methods listed below).
* We've also refactored synchronous methods such as `fs.statSync` to `fs.promises.stat` and made job validation asynchronous.
* Plugins that extend `bree.init()` may need rewritten, as `bree.init()` is now a Promise.
* If you are on Node version <= v12.20.0, please upgrade to the latest Node v12, but preferably please upgrade to the latest Node LTS (at the time of this writing it is Node v16, but if you can't upgrade to Node v16, at least upgrade to Node v14). Node v12 is EOL as of April 2022.
* Plugins will need to now `return init()` if you override the `init` function, for example (this is the change we had to make in `@breejs/ts-worker`):
```diff
// define accepted extensions
-Bree.prototype.init = function () {
+Bree.prototype.init = async function () {
if (!this.config.acceptedExtensions.includes('.ts'))
this.config.acceptedExtensions.push('.ts');
- oldInit.bind(this)();
+ return oldInit.call(this);
};
```
## Upgrading from v7 to v8
* Some fields have been converted from Objects to [Maps](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Map):
* `closeWorkerAfterMs`
* `workers`
* `timeouts`
* `intervals`
* Instead of accessing them via `bree.workers.NAME`, you should access them with `.get` (e.g. `bree.workers.get(NAME);`).
* The method `start()` will now throw an error if the job has already started.
-22
View File
@@ -1,22 +0,0 @@
// eslint-disable-next-line no-undef
docute.init({
debug: true,
title: 'Bree',
repo: 'breejs/bree',
'edit-link': 'https://github.com/breejs/bree/tree/master/',
twitter: 'niftylettuce',
nav: {
default: [
{
title: 'The best job scheduler for Node.js and JavaScript',
path: '/'
},
{
title: 'Upgrading',
path: '/UPGRADING'
}
]
},
// eslint-disable-next-line no-undef
plugins: [docuteEmojify()]
});
-48
View File
@@ -1,48 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta
name="viewport"
content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=0"
/>
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon" />
<link rel="icon" href="/favicon.ico" type="image/x-icon" />
<title>The best job scheduler for Node.js and JavaScript</title>
<link
rel="stylesheet"
href="https://unpkg.com/docute@3/dist/docute.css"
type="text/css"
/>
<meta
property="og:title"
content="The best job scheduler for Node.js with workers and cron support"
/>
<meta
property="og:description"
content="Bree is the best job scheduler for Node.js and JavaScript. Built-in support for workers, cron expression syntax, human-friendly times, Dates, and more. Made by @niftylettuce."
/>
<meta property="og:site_name" content="Bree" />
<meta property="og:type" content="website" />
<meta
property="og:image"
content="https://raw.githubusercontent.com/breejs/bree/master/media/bree.png"
/>
</head>
<body>
<div id="app"></div>
<script
src="https://unpkg.com/docute-emojify@0.1"
type="text/javascript"
></script>
<script
src="https://unpkg.com/docute@3/dist/docute.js"
type="text/javascript"
></script>
<script src="./config.js" type="text/javascript"></script>
</body>
</html>
+9 -8
View File
@@ -1,13 +1,14 @@
{
"name": "bree",
"description": "The best job scheduler for Node.js and JavaScript with cron, dates, ms, later, and human-friendly support. Works in Node v12.17.0+, uses worker threads to spawn sandboxed processes, and supports async/await, retries, throttling, concurrency, and cancelable promises (graceful shutdown). Simple, fast, and lightweight. Made for Forward Email and Lad.",
"name": "@mintplex-labs/bree",
"description": "A fork of BreeJS/bree. A Node.js and JavaScript task scheduler with cron, dates, ms, later, and human-friendly support. Works in Node v12.17.0+, uses worker threads or process forks, and supports async/await, retries, throttling, concurrency, and cancelable promises (graceful shutdown). Simple, fast, and lightweight. Customized for AnythingLLM.",
"version": "9.2.3",
"author": "Nick Baugh <niftylettuce@gmail.com> (http://niftylettuce.com/)",
"author": "Mintplex Labs Inc <team@mintplexlabs.com>",
"bugs": {
"url": "https://github.com/breejs/bree/issues",
"email": "niftylettuce@gmail.com"
"url": "https://github.com/Mintplex-Labs/bree/issues",
"email": "team@mintplexlabs.com"
},
"contributors": [
"Mintplex Labs Inc <tim@mintplexlabs.com> (https://mintplexlabs.com)",
"Nick Baugh <niftylettuce@gmail.com> (http://niftylettuce.com/)",
"shadowgate15 (https://github.com/shadowgate15)"
],
@@ -49,12 +50,12 @@
"xo": "0.54"
},
"engines": {
"node": ">=12.17.0 <13.0.0-0||>=13.2.0"
"node": ">=16.0.0"
},
"files": [
"src"
],
"homepage": "https://github.com/breejs/bree",
"homepage": "https://github.com/Mintplex-Labs/bree",
"keywords": [
"agenda",
"async",
@@ -143,7 +144,7 @@
},
"repository": {
"type": "git",
"url": "https://github.com/breejs/bree"
"url": "https://github.com/Mintplex-Labs/bree"
},
"scripts": {
"ava": "cross-env NODE_ENV=test ava",
+4 -1
View File
@@ -1,5 +1,6 @@
// Definitions by: Taylor Schley <https://github.com/shadowgate15>
import { type ChildProcess } from 'node:child_process';
import { EventEmitter } from 'node:events';
import { type WorkerOptions, type Worker } from 'node:worker_threads';
import { type Timeout, type Interval } from 'safe-timers';
@@ -12,7 +13,7 @@ declare class Bree extends EventEmitter {
config: Bree.BreeConfigs;
closeWorkerAfterMs: Map<string, Timeout>;
workers: Map<string, Worker>;
workers: Map<string, Worker | ChildProcess>;
timeouts: Map<string, Timeout>;
intervals: Map<string, Interval>;
@@ -81,6 +82,7 @@ declare namespace Bree {
worker?: Partial<WorkerOptions>;
outputWorkerMetadata?: boolean;
timezone?: string;
runAs?: 'worker' | 'process';
};
type JobOptions = Required<Pick<Job, 'name'>> & Partial<Omit<Job, 'name'>>;
@@ -102,6 +104,7 @@ declare namespace Bree {
defaultExtension: string;
acceptedExtensions: string[];
worker: WorkerOptions;
runJobsAs?: 'worker' | 'process';
errorHandler?: (error: any, workerMetadata: any) => void;
workerMessageHandler?: (message: any, workerMetadata: any) => void;
outputWorkerMetadata: boolean;
+45 -2
View File
@@ -8,6 +8,7 @@ const { pathToFileURL } = require('node:url');
const { Worker } = require('node:worker_threads');
const { join, resolve } = require('node:path');
const { debuglog } = require('node:util');
const { fork } = require('node:child_process');
const combineErrors = require('combine-errors');
const isSANB = require('is-string-and-not-blank');
const isValidPath = require('is-valid-path');
@@ -105,6 +106,10 @@ class Bree extends EventEmitter {
// });
//
outputWorkerMetadata: false,
//
// Specify to run all jobs as "worker" or "process" instead of passing
// "runAs" into each specific job.
runJobsAs: null,
...config
};
@@ -663,10 +668,11 @@ class Bree extends EventEmitter {
`Gracefully cancelled worker for job "${name}"`,
this.getWorkerMetadata(name)
);
this.workers.get(name).terminate();
this.#terminate(this.workers.get(name));
}
});
this.workers.get(name).postMessage('cancel');
this.#cancel(this.workers.get(name));
}
this.removeSafeTimer('closeWorkerAfterMs', name);
@@ -681,6 +687,16 @@ class Bree extends EventEmitter {
return pWaitFor(() => this.workers.size === 0);
}
#terminate(worker) {
if (worker?.terminate) worker.terminate();
else worker.kill(0);
}
#cancel(worker) {
if (worker?.postMessage) worker.postMessage('cancel');
else worker.kill(0);
}
async add(jobs) {
debug('add', jobs);
@@ -761,6 +777,33 @@ class Bree extends EventEmitter {
}
createWorker(filename, options) {
if (options.workerData.job.runAs === 'process') {
const childProcess = fork(filename, [], {
env: {
// eslint-disable-next-line n/prefer-global/process
...process.env
}
});
// Pipe stdout/stderr from process to logger if attached
if (this.config.logger) {
childProcess
.on('message', (message) => {
this.config.logger.info(message);
})
.on('exit', (code, signal) => {
this.config.logger.warn(
`Child process exited with code ${code} and signal ${signal}`
);
})
.on('error', (err) => {
this.config.logger.error('Child process error:', err);
});
}
return childProcess;
}
return new Worker(filename, options);
}
+17 -3
View File
@@ -7,7 +7,12 @@ const { isSchedule, parseValue, getJobPath } = require('./job-utils');
later.date.localTime();
// eslint-disable-next-line complexity
/**
*
* @param {import('.').JobOptions} job
* @param {import('.').BreeConfigs} config
* @returns {import('.').JobOptions}
*/
const buildJob = (job, config) => {
if (isSANB(job)) {
const path = join(
@@ -19,7 +24,8 @@ const buildJob = (job, config) => {
name: job,
path,
timeout: config.timeout,
interval: config.interval
interval: config.interval,
runAs: config?.runJobsAs ?? 'worker'
};
if (isSANB(config.timezone)) {
jobObject.timezone = config.timezone;
@@ -36,7 +42,8 @@ const buildJob = (job, config) => {
path,
worker: { eval: true },
timeout: config.timeout,
interval: config.interval
interval: config.interval,
runAs: 'worker' // cannot run job in fork as a function.
};
if (isSANB(config.timezone)) {
jobObject.timezone = config.timezone;
@@ -131,6 +138,13 @@ const buildJob = (job, config) => {
job.timezone = config.timezone;
}
// if config for runJobsAs was defined we can set that job property here
// as long as it is not a function string exec/eval.
job.runAs =
typeof job.path !== 'function' && config.runJobsAs
? config.runJobsAs
: job.runAs ?? 'worker'; // fallback to always have this value assigned to at least 'worker'
return job;
};
+9
View File
@@ -306,6 +306,15 @@ const validate = async (job, i, names, config) => {
}
}
// Validate runAs
if (job.runAs !== undefined && !['worker', 'process'].includes(job.runAs)) {
errors.push(
new Error(
`${prefix} had runAs value set, but it must be an "worker" or "process"`
)
);
}
if (errors.length > 0) {
throw combineErrors(errors);
}
+168 -1
View File
@@ -19,7 +19,9 @@ test('successfully run job', async (t) => {
t.plan(2);
const logger = {
info() {}
info() {},
warn() {},
error() {}
};
const bree = new Bree({
@@ -75,6 +77,7 @@ test('throws if jobs is not an array and logs ERR_MODULE_NOT_FOUND error by defa
const logger = {
info() {},
warn() {},
error(err) {
t.is(err.code, 'ERR_MODULE_NOT_FOUND');
}
@@ -95,6 +98,7 @@ test('logs ERR_MODULE_NOT_FOUND error if array is empty', async (t) => {
const logger = {
info() {},
warn() {},
error(err) {
t.is(err.code, 'ERR_MODULE_NOT_FOUND');
}
@@ -115,6 +119,7 @@ test('logs ERR_MODULE_NOT_FOUND error if array is empty', async (t) => {
test('does not log ERR_MODULE_NOT_FOUND error if silenceRootCheckError is false', async (t) => {
const logger = {
info() {},
warn() {},
error() {
t.fail();
}
@@ -136,6 +141,7 @@ test('does not log ERR_MODULE_NOT_FOUND error if silenceRootCheckError is false'
test('does not log ERR_MODULE_NOT_FOUND error if doRootCheck is false', async (t) => {
const logger = {
info() {},
warn() {},
error() {
t.fail();
}
@@ -363,3 +369,164 @@ test(`bree.init() is called if bree.add() is called`, async (t) => {
await bree.add('message');
t.true(bree._init);
});
test('successfully run job without "runAs" or config "runJobsAs" will make workers.', async (t) => {
t.plan(2);
const logger = {
info() {}
};
const bree = new Bree({
jobs: [
{
name: 'basic'
}
],
...baseConfig,
logger
});
await bree.start();
bree.on('worker created', (name) => {
const worker = bree.workers.get(name);
t.true(Boolean(worker));
t.true(Boolean(worker?.postMessage));
});
await delay(100);
await bree.stop();
});
test('successfully run job with runAs === "worker" to make worker.', async (t) => {
t.plan(2);
const logger = {
info() {}
};
const bree = new Bree({
jobs: [
{
name: 'basic',
runAs: 'worker'
}
],
...baseConfig,
logger
});
await bree.start();
bree.on('worker created', (name) => {
const worker = bree.workers.get(name);
t.true(Boolean(worker));
t.true(Boolean(worker?.postMessage));
});
await delay(100);
await bree.stop();
});
test('successfully run job with runAs === "process" to make fork.', async (t) => {
t.plan(2);
const logger = {
info() {},
warn() {},
error() {}
};
const bree = new Bree({
jobs: [
{
name: 'basic',
runAs: 'process'
}
],
...baseConfig,
logger
});
await bree.start();
bree.on('worker created', (name) => {
const worker = bree.workers.get(name);
t.true(Boolean(worker));
t.true(worker?.postMessage === undefined); // processes don't have postMessage
});
await delay(100);
await bree.stop();
});
test('successfully apply config runJobAs: "process" to make forked jobs.', async (t) => {
t.plan(2);
const logger = {
info() {},
warn() {},
error() {}
};
const bree = new Bree({
runJobsAs: 'process',
jobs: [
{
name: 'basic'
}
],
...baseConfig,
logger
});
await bree.start();
bree.on('worker created', (name) => {
const worker = bree.workers.get(name);
t.true(Boolean(worker));
t.true(worker?.postMessage === undefined); // processes don't have postMessage
});
await delay(100);
await bree.stop();
});
test('job runAs is overwritten by bree config successfully.', async (t) => {
t.plan(2);
const logger = {
info() {},
warn() {},
error() {}
};
const bree = new Bree({
runJobsAs: 'process',
jobs: [
{
name: 'basic',
runAs: 'worker'
}
],
...baseConfig,
logger
});
await bree.start();
bree.on('worker created', (name) => {
const worker = bree.workers.get(name);
t.true(Boolean(worker));
t.true(worker?.postMessage === undefined); // processes don't have postMessage
});
await delay(100);
await bree.stop();
});
+85 -17
View File
@@ -15,6 +15,13 @@ const baseConfig = {
acceptedExtensions: ['.js', '.mjs']
};
const baseJobConfig = {
name: 'basic',
path: jobPathBasic,
timeout: 0,
interval: 0
};
function job(t, _job, config, expected) {
t.deepEqual(
jobBuilder(_job || 'basic', { ...baseConfig, ...config }),
@@ -27,7 +34,13 @@ test(
job,
null,
{},
{ name: 'basic', path: jobPathBasic, timeout: 0, interval: 0 }
{
name: 'basic',
path: jobPathBasic,
timeout: 0,
interval: 0,
runAs: 'worker'
}
);
test(
@@ -39,7 +52,8 @@ test(
name: 'basic.js',
path: jobPathBasic,
timeout: 0,
interval: 0
interval: 0,
runAs: 'worker'
}
);
@@ -59,7 +73,8 @@ test(
path: `(${basic.toString()})()`,
worker: { eval: true },
timeout: 0,
interval: 0
interval: 0,
runAs: 'worker'
}
);
@@ -71,7 +86,8 @@ test(
{
path: `(${basic.toString()})()`,
worker: { eval: true, test: 1 },
timeout: 0
timeout: 0,
runAs: 'worker'
}
);
@@ -80,7 +96,7 @@ test(
job,
{ name: 'basic', path: '' },
{},
{ name: 'basic', path: jobPathBasic, timeout: 0 }
{ name: 'basic', path: jobPathBasic, timeout: 0, runAs: 'worker' }
);
test(
@@ -88,7 +104,7 @@ test(
job,
{ name: 'basic.js', path: '' },
{},
{ name: 'basic.js', path: jobPathBasic, timeout: 0 }
{ name: 'basic.js', path: jobPathBasic, timeout: 0, runAs: 'worker' }
);
test(
@@ -96,7 +112,7 @@ test(
job,
{ path: jobPathBasic },
{},
{ path: jobPathBasic, timeout: 0 }
{ path: jobPathBasic, timeout: 0, runAs: 'worker' }
);
test(
@@ -104,7 +120,7 @@ test(
job,
{ path: '*.js', worker: { test: 1 } },
{},
{ path: '*.js', timeout: 0, worker: { eval: true, test: 1 } }
{ path: '*.js', timeout: 0, worker: { eval: true, test: 1 }, runAs: 'worker' }
);
test(
@@ -112,7 +128,7 @@ test(
job,
{ path: jobPathBasic, timeout: 10 },
{},
{ path: jobPathBasic, timeout: 10 }
{ path: jobPathBasic, timeout: 10, runAs: 'worker' }
);
test(
@@ -120,7 +136,7 @@ test(
job,
{ path: jobPathBasic, interval: 10 },
{},
{ path: jobPathBasic, interval: 10 }
{ path: jobPathBasic, interval: 10, runAs: 'worker' }
);
test(
@@ -131,7 +147,8 @@ test(
{
path: jobPathBasic,
cron: '* * * * *',
interval: later.parse.cron('* * * * *')
interval: later.parse.cron('* * * * *'),
runAs: 'worker'
}
);
@@ -144,7 +161,8 @@ test(
path: jobPathBasic,
cron: '* * * * *',
interval: later.parse.cron('* * * * *'),
hasSeconds: false
hasSeconds: false,
runAs: 'worker'
}
);
@@ -156,7 +174,8 @@ test(
{
path: jobPathBasic,
cron: later.parse.cron('* * * * *'),
interval: later.parse.cron('* * * * *')
interval: later.parse.cron('* * * * *'),
runAs: 'worker'
}
);
@@ -165,7 +184,13 @@ test(
job,
{ name: 'basic', interval: undefined },
{ interval: 10 },
{ name: 'basic', path: jobPathBasic, timeout: 0, interval: 10 }
{
name: 'basic',
path: jobPathBasic,
timeout: 0,
interval: 10,
runAs: 'worker'
}
);
test(
@@ -178,7 +203,8 @@ test(
name: 'basic',
path: jobPathBasic,
timeout: 0,
interval: 0
interval: 0,
runAs: 'worker'
}
);
@@ -193,6 +219,7 @@ test(
path: `(${basic.toString()})()`,
timeout: 0,
interval: 0,
runAs: 'worker',
worker: { eval: true }
}
);
@@ -206,7 +233,8 @@ test(
timezone: 'local',
name: 'basic',
path: jobPathBasic,
timeout: 0
timeout: 0,
runAs: 'worker'
}
);
@@ -219,6 +247,46 @@ test(
timezone: 'America/New_York',
name: 'basic',
path: jobPathBasic,
timeout: 0
timeout: 0,
runAs: 'worker'
}
);
test('successfully applies job runAs', (t) => {
t.plan(3);
const base_job = jobBuilder({ ...baseJobConfig }, { ...baseConfig });
const proc_job = jobBuilder(
{ ...baseJobConfig, runAs: 'process' },
{ ...baseConfig }
);
const worker_job = jobBuilder(
{ ...baseJobConfig, runAs: 'worker' },
{ ...baseConfig }
);
t.true(base_job.runAs === 'worker');
t.true(proc_job.runAs === 'process');
t.true(worker_job.runAs === 'worker');
});
test('successfully overwrites job runAs with breeConfig', (t) => {
t.plan(4);
const job1 = jobBuilder(
{ ...baseJobConfig, runAs: 'worker' },
{ ...baseConfig, runJobsAs: 'process' }
); // inherit process
const job2 = jobBuilder(
{ ...baseJobConfig, runAs: 'process' },
{ ...baseConfig, runJobsAs: 'worker' }
); // inherit worker
const job3 = jobBuilder(
{ ...baseJobConfig },
{ ...baseConfig, runJobsAs: 'process' }
); // inherit process
const job4 = jobBuilder({ ...baseJobConfig }, { ...baseConfig }); // base config
t.true(job1.runAs === 'process');
t.true(job2.runAs === 'worker');
t.true(job3.runAs === 'process');
t.true(job4.runAs === 'worker');
});