feat(search): handle search indexing coordination among nodes

Signed-off-by: Phil Kuang <pkuang@factset.com>
This commit is contained in:
Phil Kuang
2022-02-23 16:11:50 -05:00
parent db21d78179
commit 0a63e99a26
17 changed files with 438 additions and 269 deletions
+84
View File
@@ -0,0 +1,84 @@
---
'@backstage/plugin-search-backend-node': minor
'@backstage/create-app': patch
---
**BREAKING**: `IndexBuilder.addCollator()` now requires a `schedule` parameter (replacing `defaultRefreshIntervalSeconds`) which is expected to be a `TaskRunner` that is configured with the desired search indexing schedule for the given collator.
`Scheduler.addToSchedule()` now takes a new parameter object (`ScheduleTaskParameters`) with two new options `id` and `scheduledRunner` in addition to the migrated `task` argument.
NOTE: The search backend plugin now creates a dedicated database for coordinating indexing tasks.
To make this change to an existing app, make the following changes to `packages/backend/src/plugins/search.ts`:
```diff
+import { Duration } from 'luxon';
/* ... */
+ const schedule = env.scheduler.createScheduledTaskRunner({
+ frequency: Duration.fromObject({ seconds: 600 }),
+ timeout: Duration.fromObject({ seconds: 900 }),
+ initialDelay: Duration.fromObject({ seconds: 3 }),
+ });
indexBuilder.addCollator({
- defaultRefreshIntervalSeconds: 600,
+ schedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
}),
});
indexBuilder.addCollator({
- defaultRefreshIntervalSeconds: 600,
+ schedule,
factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
}),
});
const { scheduler } = await indexBuilder.build();
- setTimeout(() => scheduler.start(), 3000);
+ scheduler.start();
/* ... */
```
NOTE: For scenarios where the `lunr` search engine is used in a multi-node configuration, a non-distributed `TaskRunner` like the following should be implemented to ensure consistency across nodes (alternatively, you can configure
the search plugin to use a non-distributed DB such as [SQLite](https://backstage.io/docs/tutorials/configuring-plugin-databases#postgresql-and-sqlite-3)):
```diff
+import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks';
/* ... */
+ const schedule: TaskRunner = {
+ run: async (task: TaskInvocationDefinition) => {
+ const startRefresh = async () => {
+ while (!task.signal?.aborted) {
+ try {
+ await task.fn(task.signal);
+ } catch {
+ // ignore intentionally
+ }
+
+ await new Promise(resolve => setTimeout(resolve, 600 * 1000));
+ }
+ };
+ startRefresh();
+ },
+ };
indexBuilder.addCollator({
- defaultRefreshIntervalSeconds: 600,
+ schedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
}),
});
/* ... */
```
+3 -1
View File
@@ -84,7 +84,9 @@ index-time.
There are many ways a search index could be built and maintained, but Backstage
Search chooses to completely rebuild indices on a schedule. Different collators
can be configured to refresh at different intervals, depending on how often the
source information is updated.
source information is updated. When search indexing is distributed among multiple
backend nodes, coordination to prevent clashes is typically handled by a
distributed `TaskRunner`.
### The Search Page
+72 -10
View File
@@ -149,6 +149,7 @@ import {
import { PluginEnvironment } from '../types';
import { DefaultCatalogCollator } from '@backstage/plugin-catalog-backend';
import { Router } from 'express';
import { Duration } from 'luxon';
export default async function createPlugin(
env: PluginEnvironment,
@@ -161,9 +162,15 @@ export default async function createPlugin(
searchEngine,
});
const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({
frequency: Duration.fromObject({ seconds: 600 }),
timeout: Duration.fromObject({ seconds: 900 }),
initialDelay: Duration.fromObject({ seconds: 3 }),
});
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 600,
collator: new DefaultCatalogCollator({
schedule: every10MinutesSchedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
}),
@@ -287,32 +294,87 @@ which are responsible for providing documents
number of collators with the `IndexBuilder` like this:
```typescript
import { Duration } from 'luxon';
const indexBuilder = new IndexBuilder({ logger: env.logger, searchEngine });
const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({
frequency: Duration.fromObject({ seconds: 600 }),
timeout: Duration.fromObject({ seconds: 900 }),
initialDelay: Duration.fromObject({ seconds: 3 }),
});
const everyHourSchedule = env.scheduler.createScheduledTaskRunner({
frequency: Duration.fromObject({ seconds: 3600 }),
timeout: Duration.fromObject({ seconds: 5400 }),
initialDelay: Duration.fromObject({ seconds: 3 }),
});
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 600,
collator: new DefaultCatalogCollator({
schedule: every10MinutesSchedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
}),
});
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 3600,
collator: new MyCustomCollator(),
schedule: everyHourSchedule,
factory: new MyCustomCollatorFactory(),
});
```
Backstage Search builds and maintains its index
[on a schedule](./concepts.md#the-scheduler). You can change how often the
indexes are rebuilt for a given type of document. You may want to do this if
your documents are updated more or less frequently. You can do so by modifying
its `defaultRefreshIntervalSeconds` value, like this:
your documents are updated more or less frequently. You can do so by configuring
a scheduled `TaskRunner` to pass into the `schedule` value, like this:
```typescript {3}
const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({
frequency: Duration.fromObject({ seconds: 600 }),
timeout: Duration.fromObject({ seconds: 900 }),
initialDelay: Duration.fromObject({ seconds: 3 }),
});
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 600,
collator: new DefaultCatalogCollator({
schedule: every10MinutesSchedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
}),
});
```
Note: if you are using the in-memory Lunr search engine, you probably want to
implement a non-distributed `TaskRunner` like the following to ensure consistency
if you're running multiple search backend nodes (alternatively, you can configure
the search plugin to use a non-distributed database such as
[SQLite](../../tutorials/configuring-plugin-databases.md#postgresql-and-sqlite-3)):
```typescript
import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks';
const schedule: TaskRunner = {
run: async (task: TaskInvocationDefinition) => {
const startRefresh = async () => {
while (!task.signal?.aborted) {
try {
await task.fn(task.signal);
} catch {
// ignore intentionally
}
await new Promise(resolve => setTimeout(resolve, 600 * 1000));
}
};
startRefresh();
},
};
indexBuilder.addCollator({
schedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
}),
+3 -1
View File
@@ -68,6 +68,7 @@
"express": "^4.17.1",
"express-promise-router": "^4.1.0",
"express-prom-bundle": "^6.3.6",
"luxon": "^2.0.2",
"pg": "^8.3.0",
"pg-connection-string": "^2.3.0",
"prom-client": "^14.0.1",
@@ -77,7 +78,8 @@
"@backstage/cli": "^0.17.0-next.1",
"@types/dockerode": "^3.3.0",
"@types/express": "^4.17.6",
"@types/express-serve-static-core": "^4.17.5"
"@types/express-serve-static-core": "^4.17.5",
"@types/luxon": "^2.0.4"
},
"files": [
"dist"
+12 -5
View File
@@ -26,6 +26,7 @@ import {
} from '@backstage/plugin-search-backend-node';
import { DefaultTechDocsCollatorFactory } from '@backstage/plugin-techdocs-backend';
import { Router } from 'express';
import { Duration } from 'luxon';
import { PluginEnvironment } from '../types';
async function createSearchEngine(
@@ -55,10 +56,18 @@ export default async function createPlugin(
searchEngine,
});
const schedule = env.scheduler.createScheduledTaskRunner({
frequency: Duration.fromObject({ seconds: 600 }),
timeout: Duration.fromObject({ seconds: 900 }),
// A 3 second delay gives the backend server a chance to initialize before
// any collators are executed, which may attempt requests against the API.
initialDelay: Duration.fromObject({ seconds: 3 }),
});
// Collators are responsible for gathering documents known to plugins. This
// particular collator gathers entities from the software catalog.
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 600,
schedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
@@ -66,7 +75,7 @@ export default async function createPlugin(
});
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 600,
schedule,
factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
logger: env.logger,
@@ -77,10 +86,8 @@ export default async function createPlugin(
// The scheduler controls when documents are gathered from collators and sent
// to the search engine for indexing.
const { scheduler } = await indexBuilder.build();
scheduler.start();
// A 3 second delay gives the backend server a chance to initialize before
// any collators are executed, which may attempt requests against the API.
setTimeout(() => scheduler.start(), 3000);
useHotCleanup(module, () => scheduler.stop());
return await createRouter({
@@ -40,6 +40,7 @@
"dockerode": "^3.3.1",
"express": "^4.17.1",
"express-promise-router": "^4.1.0",
"luxon": "^2.0.2",
{{#if dbTypePG}}
"pg": "^8.3.0",
{{/if}}
@@ -52,7 +53,8 @@
"@backstage/cli": "^{{version '@backstage/cli'}}",
"@types/dockerode": "^3.3.0",
"@types/express": "^4.17.6",
"@types/express-serve-static-core": "^4.17.5"
"@types/express-serve-static-core": "^4.17.5",
"@types/luxon": "^2.0.4"
},
"files": [
"dist"
@@ -11,6 +11,7 @@ import { PluginEnvironment } from '../types';
import { DefaultCatalogCollatorFactory } from '@backstage/plugin-catalog-backend';
import { DefaultTechDocsCollatorFactory } from '@backstage/plugin-techdocs-backend';
import { Router } from 'express';
import { Duration } from 'luxon';
export default async function createPlugin(
env: PluginEnvironment,
@@ -31,10 +32,18 @@ export default async function createPlugin(
searchEngine,
});
const schedule = env.scheduler.createScheduledTaskRunner({
frequency: Duration.fromObject({ seconds: 600 }),
timeout: Duration.fromObject({ seconds: 900 }),
// A 3 second delay gives the backend server a chance to initialize before
// any collators are executed, which may attempt requests against the API.
initialDelay: Duration.fromObject({ seconds: 3 }),
});
// Collators are responsible for gathering documents known to plugins. This
// collator gathers entities from the software catalog.
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 600,
schedule,
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
tokenManager: env.tokenManager,
@@ -43,7 +52,7 @@ export default async function createPlugin(
// collator gathers entities from techdocs.
indexBuilder.addCollator({
defaultRefreshIntervalSeconds: 600,
schedule,
factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, {
discovery: env.discovery,
logger: env.logger,
@@ -54,10 +63,8 @@ export default async function createPlugin(
// The scheduler controls when documents are gathered from collators and sent
// to the search engine for indexing.
const { scheduler } = await indexBuilder.build();
scheduler.start();
// A 3 second delay gives the backend server a chance to initialize before
// any collators are executed, which may attempt requests against the API.
setTimeout(() => scheduler.start(), 3000);
useHotCleanup(module, () => scheduler.stop());
return await createRouter({
+15 -6
View File
@@ -16,6 +16,8 @@ import { QueryTranslator } from '@backstage/plugin-search-common';
import { Readable } from 'stream';
import { SearchEngine } from '@backstage/plugin-search-common';
import { SearchQuery } from '@backstage/plugin-search-common';
import { TaskFunction } from '@backstage/backend-tasks';
import { TaskRunner } from '@backstage/backend-tasks';
import { Transform } from 'stream';
import { Writable } from 'stream';
@@ -52,10 +54,7 @@ export abstract class DecoratorBase extends Transform {
// @beta (undocumented)
export class IndexBuilder {
constructor({ logger, searchEngine }: IndexBuilderOptions);
addCollator({
factory,
defaultRefreshIntervalSeconds,
}: RegisterCollatorParameters): void;
addCollator({ factory, schedule }: RegisterCollatorParameters): void;
addDecorator({ factory }: RegisterDecoratorParameters): void;
build(): Promise<{
scheduler: Scheduler;
@@ -111,8 +110,8 @@ export class LunrSearchEngineIndexer extends BatchSearchEngineIndexer {
// @beta
export interface RegisterCollatorParameters {
defaultRefreshIntervalSeconds: number;
factory: DocumentCollatorFactory;
schedule: TaskRunner;
}
// @beta
@@ -123,11 +122,21 @@ export interface RegisterDecoratorParameters {
// @beta (undocumented)
export class Scheduler {
constructor({ logger }: { logger: Logger });
addToSchedule(task: Function, interval: number): void;
addToSchedule({ id, task, scheduledRunner }: ScheduleTaskParameters): void;
start(): void;
stop(): void;
}
// @public
export interface ScheduleTaskParameters {
// (undocumented)
id: string;
// (undocumented)
scheduledRunner: TaskRunner;
// (undocumented)
task: TaskFunction;
}
export { SearchEngine };
// @beta
+2
View File
@@ -23,11 +23,13 @@
"clean": "backstage-cli package clean"
},
"dependencies": {
"@backstage/backend-tasks": "^0.3.0-next.2",
"@backstage/errors": "^1.0.0",
"@backstage/plugin-search-common": "^0.3.3-next.1",
"@types/lunr": "^2.3.3",
"lodash": "^4.17.21",
"lunr": "^2.3.9",
"node-abort-controller": "^3.0.1",
"winston": "^3.2.1"
},
"devDependencies": {
@@ -15,6 +15,7 @@
*/
import { getVoidLogger } from '@backstage/backend-common';
import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks';
import {
DocumentCollatorFactory,
DocumentDecoratorFactory,
@@ -53,9 +54,15 @@ class DifferentlyTypedDocumentDecoratorFactory extends TestDocumentDecoratorFact
describe('IndexBuilder', () => {
let testSearchEngine: SearchEngine;
let testIndexBuilder: IndexBuilder;
let testScheduledTaskRunner: TaskRunner;
beforeEach(() => {
const logger = getVoidLogger();
testScheduledTaskRunner = {
run: async (task: TaskInvocationDefinition & { fn: () => void }) => {
task.fn();
},
};
testSearchEngine = new LunrSearchEngine({ logger });
testIndexBuilder = new IndexBuilder({
logger,
@@ -65,35 +72,32 @@ describe('IndexBuilder', () => {
describe('addCollator', () => {
it('adds a collator', async () => {
jest.useFakeTimers();
const testCollatorFactory = new TestDocumentCollatorFactory();
const collatorSpy = jest.spyOn(testCollatorFactory, 'getCollator');
// Add a collator.
testIndexBuilder.addCollator({
defaultRefreshIntervalSeconds: 6,
factory: testCollatorFactory,
schedule: testScheduledTaskRunner,
});
// Build the index and ensure the collator was invoked.
const { scheduler } = await testIndexBuilder.build();
scheduler.start();
jest.advanceTimersByTime(6000);
expect(collatorSpy).toHaveBeenCalled();
});
});
describe('addDecorator', () => {
it('adds a decorator', async () => {
jest.useFakeTimers();
const testCollatorFactory = new TestDocumentCollatorFactory();
const testDecoratorFactory = new TestDocumentDecoratorFactory();
const decoratorSpy = jest.spyOn(testDecoratorFactory, 'getDecorator');
// Add a collator.
testIndexBuilder.addCollator({
defaultRefreshIntervalSeconds: 6,
factory: testCollatorFactory,
schedule: testScheduledTaskRunner,
});
// Add a decorator.
@@ -104,14 +108,12 @@ describe('IndexBuilder', () => {
// Build the index and ensure the decorator was invoked.
const { scheduler } = await testIndexBuilder.build();
scheduler.start();
jest.advanceTimersByTime(6000);
// wait for async decorator execution
await Promise.resolve();
expect(decoratorSpy).toHaveBeenCalled();
});
it('adds a type-specific decorator', async () => {
jest.useFakeTimers();
const testCollatorFactory = new TypedDocumentCollatorFactory();
const testDecoratorFactory = new TypedDocumentDecoratorFactory();
jest.spyOn(testCollatorFactory, 'getCollator');
@@ -119,8 +121,8 @@ describe('IndexBuilder', () => {
// Add a collator.
testIndexBuilder.addCollator({
defaultRefreshIntervalSeconds: 6,
factory: testCollatorFactory,
schedule: testScheduledTaskRunner,
});
// Add a decorator for the same type.
@@ -131,7 +133,6 @@ describe('IndexBuilder', () => {
// Build the index and ensure the decorator was invoked.
const { scheduler } = await testIndexBuilder.build();
scheduler.start();
jest.advanceTimersByTime(6000);
// wait for async decorator execution
await Promise.resolve();
expect(decoratorSpy).toHaveBeenCalled();
@@ -146,8 +147,8 @@ describe('IndexBuilder', () => {
// Add a collator.
testIndexBuilder.addCollator({
defaultRefreshIntervalSeconds: 6,
factory: testCollatorFactory,
schedule: testScheduledTaskRunner,
});
// Add a decorator for a different type.
@@ -158,7 +159,6 @@ describe('IndexBuilder', () => {
// Build the index and ensure the decorator was not invoked.
const { scheduler } = await testIndexBuilder.build();
scheduler.start();
jest.advanceTimersByTime(6000);
expect(collatorSpy).toHaveBeenCalled();
expect(decoratorSpy).not.toHaveBeenCalled();
});
+52 -56
View File
@@ -13,32 +13,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
DocumentCollatorFactory,
DocumentDecoratorFactory,
DocumentTypeInfo,
SearchEngine,
} from '@backstage/plugin-search-common';
import { Transform, pipeline } from 'stream';
import { Logger } from 'winston';
import { Scheduler } from './index';
import { Scheduler } from './Scheduler';
import {
IndexBuilderOptions,
RegisterCollatorParameters,
RegisterDecoratorParameters,
} from './types';
interface CollatorEnvelope {
factory: DocumentCollatorFactory;
refreshInterval: number;
}
/**
* @beta
*/
export class IndexBuilder {
private collators: Record<string, CollatorEnvelope>;
private collators: Record<string, RegisterCollatorParameters>;
private decorators: Record<string, DocumentDecoratorFactory[]>;
private documentTypes: Record<string, DocumentTypeInfo>;
private searchEngine: SearchEngine;
@@ -64,16 +57,13 @@ export class IndexBuilder {
* Makes the index builder aware of a collator that should be executed at the
* given refresh interval.
*/
addCollator({
factory,
defaultRefreshIntervalSeconds,
}: RegisterCollatorParameters): void {
addCollator({ factory, schedule }: RegisterCollatorParameters): void {
this.logger.info(
`Added ${factory.constructor.name} collator factory for type ${factory.type}`,
);
this.collators[factory.type] = {
refreshInterval: defaultRefreshIntervalSeconds,
factory,
schedule,
};
this.documentTypes[factory.type] = {
visibilityPermission: factory.visibilityPermission,
@@ -106,51 +96,57 @@ export class IndexBuilder {
* scheduler returned to the caller.
*/
async build(): Promise<{ scheduler: Scheduler }> {
const scheduler = new Scheduler({ logger: this.logger });
const scheduler = new Scheduler({
logger: this.logger,
});
Object.keys(this.collators).forEach(type => {
scheduler.addToSchedule(async () => {
// Instantiate the collator.
const collator = await this.collators[type].factory.getCollator();
this.logger.info(
`Collating documents for ${type} via ${this.collators[type].factory.constructor.name}`,
);
// Instantiate all relevant decorators.
const decorators: Transform[] = await Promise.all(
(this.decorators['*'] || [])
.concat(this.decorators[type] || [])
.map(async factory => {
const decorator = await factory.getDecorator();
this.logger.info(
`Attached decorator via ${factory.constructor.name} to ${type} index pipeline.`,
);
return decorator;
}),
);
// Instantiate the indexer.
const indexer = await this.searchEngine.getIndexer(type);
// Compose collator/decorators/indexer into a pipeline
return new Promise<void>(done => {
pipeline(
[collator, ...decorators, indexer],
(error: NodeJS.ErrnoException | null) => {
if (error) {
this.logger.error(
`Collating documents for ${type} failed: ${error}`,
);
} else {
this.logger.info(`Collating documents for ${type} succeeded`);
}
// Signal index pipeline completion!
done();
},
scheduler.addToSchedule({
id: `search_index_${type.replace('-', '_').toLocaleLowerCase('en-US')}`,
scheduledRunner: this.collators[type].schedule,
task: async () => {
// Instantiate the collator.
const collator = await this.collators[type].factory.getCollator();
this.logger.info(
`Collating documents for ${type} via ${this.collators[type].factory.constructor.name}`,
);
});
}, this.collators[type].refreshInterval * 1000);
// Instantiate all relevant decorators.
const decorators: Transform[] = await Promise.all(
(this.decorators['*'] || [])
.concat(this.decorators[type] || [])
.map(async factory => {
const decorator = await factory.getDecorator();
this.logger.info(
`Attached decorator via ${factory.constructor.name} to ${type} index pipeline.`,
);
return decorator;
}),
);
// Instantiate the indexer.
const indexer = await this.searchEngine.getIndexer(type);
// Compose collator/decorators/indexer into a pipeline
return new Promise<void>(done => {
pipeline(
[collator, ...decorators, indexer],
(error: NodeJS.ErrnoException | null) => {
if (error) {
this.logger.error(
`Collating documents for ${type} failed: ${error}`,
);
} else {
this.logger.info(`Collating documents for ${type} succeeded`);
}
// Signal index pipeline completion!
done();
},
);
});
},
});
});
return {
+129 -16
View File
@@ -29,31 +29,107 @@ describe('Scheduler', () => {
describe('addToSchedule', () => {
it('should not add a task and interval to schedule, if already started', async () => {
jest.useFakeTimers();
const mockTask1 = jest.fn();
const mockTask2 = jest.fn();
const mockScheduledTaskRunner1 = {
run: jest.fn(),
};
const mockScheduledTaskRunner2 = {
run: jest.fn(),
};
// Add a task and interval to schedule
testScheduler.addToSchedule(mockTask1, 2);
testScheduler.addToSchedule({
id: 'id1',
task: mockTask1,
scheduledRunner: mockScheduledTaskRunner1,
});
// Starts scheduling process
testScheduler.start();
// Throws Error if task and interval is added to a already started schedule
expect(() => testScheduler.addToSchedule(mockTask2, 2)).toThrowError();
expect(() =>
testScheduler.addToSchedule({
id: 'id2',
task: mockTask2,
scheduledRunner: mockScheduledTaskRunner2,
}),
).toThrowError();
jest.runOnlyPendingTimers();
expect(mockTask1).toHaveBeenCalled();
expect(mockTask2).not.toHaveBeenCalled();
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
expect.objectContaining({
id: 'id1',
fn: mockTask1,
}),
);
expect(mockScheduledTaskRunner2.run).not.toHaveBeenCalledWith(
expect.objectContaining({
id: 'id2',
fn: mockTask2,
}),
);
});
it('should not add a task to schedule, if it already exists', async () => {
const mockTask1 = jest.fn();
const mockTask2 = jest.fn();
const mockScheduledTaskRunner1 = {
run: jest.fn(),
};
const mockScheduledTaskRunner2 = {
run: jest.fn(),
};
// Add a task and interval to schedule
testScheduler.addToSchedule({
id: 'id1',
task: mockTask1,
scheduledRunner: mockScheduledTaskRunner1,
});
// Throws Error if task and interval is added to a already started schedule
expect(() =>
testScheduler.addToSchedule({
id: 'id1',
task: mockTask2,
scheduledRunner: mockScheduledTaskRunner2,
}),
).toThrowError();
// Starts scheduling process
testScheduler.start();
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
expect.objectContaining({
id: 'id1',
fn: mockTask1,
}),
);
expect(mockScheduledTaskRunner2.run).not.toHaveBeenCalledWith(
expect.objectContaining({
id: 'id2',
fn: mockTask2,
}),
);
});
it('should be possible to add a task and interval to schedule, if already started, but stopped in between', async () => {
jest.useFakeTimers();
const mockTask1 = jest.fn();
const mockTask2 = jest.fn();
const mockScheduledTaskRunner1 = {
run: jest.fn(),
};
const mockScheduledTaskRunner2 = {
run: jest.fn(),
};
// Add a task and interval to schedule
testScheduler.addToSchedule(mockTask1, 2);
testScheduler.addToSchedule({
id: 'id1',
task: mockTask1,
scheduledRunner: mockScheduledTaskRunner1,
});
// Starts scheduling process
testScheduler.start();
@@ -63,15 +139,28 @@ describe('Scheduler', () => {
// Shouldn't throw error, as it is stopped.
expect(() =>
testScheduler.addToSchedule(mockTask2, 4),
testScheduler.addToSchedule({
id: 'id2',
task: mockTask2,
scheduledRunner: mockScheduledTaskRunner2,
}),
).not.toThrowError();
// Starts scheduling process
testScheduler.start();
jest.runOnlyPendingTimers();
expect(mockTask1).toHaveBeenCalled();
expect(mockTask2).toHaveBeenCalled();
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
expect.objectContaining({
id: 'id1',
fn: mockTask1,
}),
);
expect(mockScheduledTaskRunner2.run).toHaveBeenCalledWith(
expect.objectContaining({
id: 'id2',
fn: mockTask2,
}),
);
});
});
@@ -79,16 +168,40 @@ describe('Scheduler', () => {
it('should execute tasks on start', () => {
const mockTask1 = jest.fn();
const mockTask2 = jest.fn();
const mockScheduledTaskRunner1 = {
run: jest.fn(),
};
const mockScheduledTaskRunner2 = {
run: jest.fn(),
};
// Add tasks and interval to schedule
testScheduler.addToSchedule(mockTask1, 2);
testScheduler.addToSchedule(mockTask2, 2);
testScheduler.addToSchedule({
id: 'id1',
task: mockTask1,
scheduledRunner: mockScheduledTaskRunner1,
});
testScheduler.addToSchedule({
id: 'id2',
task: mockTask2,
scheduledRunner: mockScheduledTaskRunner2,
});
// Starts scheduling process
testScheduler.start();
expect(mockTask1).toHaveBeenCalled();
expect(mockTask2).toHaveBeenCalled();
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
expect.objectContaining({
id: 'id1',
fn: mockTask1,
}),
);
expect(mockScheduledTaskRunner2.run).toHaveBeenCalledWith(
expect.objectContaining({
id: 'id2',
fn: mockTask2,
}),
);
});
});
});
+34 -16
View File
@@ -14,29 +14,38 @@
* limitations under the License.
*/
import { AbortController } from 'node-abort-controller';
import { Logger } from 'winston';
import { runPeriodically } from './runPeriodically';
import { TaskFunction, TaskRunner } from '@backstage/backend-tasks';
type TaskEnvelope = {
task: Function;
interval: number;
task: TaskFunction;
scheduledRunner: TaskRunner;
};
/**
* TODO: coordination, error handling
* @public ScheduleTaskParameters
*/
export interface ScheduleTaskParameters {
id: string;
task: TaskFunction;
scheduledRunner: TaskRunner;
}
/**
* @beta
*/
export class Scheduler {
private logger: Logger;
private schedule: TaskEnvelope[];
private runningTasks: Function[] = [];
private schedule: { [id: string]: TaskEnvelope };
private abortController: AbortController;
private isRunning: boolean;
constructor({ logger }: { logger: Logger }) {
this.logger = logger;
this.schedule = [];
this.schedule = {};
this.abortController = new AbortController();
this.isRunning = false;
}
/**
@@ -44,13 +53,18 @@ export class Scheduler {
* When running the tasks, the scheduler waits at least for the time specified
* in the interval once the task was completed, before running it again.
*/
addToSchedule(task: Function, interval: number) {
if (this.runningTasks.length) {
addToSchedule({ id, task, scheduledRunner }: ScheduleTaskParameters) {
if (this.isRunning) {
throw new Error(
'Cannot add task to schedule that has already been started.',
);
}
this.schedule.push({ task, interval });
if (this.schedule[id]) {
throw new Error(`Task with id ${id} already exists.`);
}
this.schedule[id] = { task, scheduledRunner };
}
/**
@@ -58,8 +72,14 @@ export class Scheduler {
*/
start() {
this.logger.info('Starting all scheduled search tasks.');
this.schedule.forEach(({ task, interval }) => {
this.runningTasks.push(runPeriodically(() => task(), interval));
this.isRunning = true;
Object.keys(this.schedule).forEach(id => {
const { task, scheduledRunner } = this.schedule[id];
scheduledRunner.run({
id,
fn: task,
signal: this.abortController.signal,
});
});
}
@@ -68,9 +88,7 @@ export class Scheduler {
*/
stop() {
this.logger.info('Stopping all scheduled search tasks.');
this.runningTasks.forEach(cancel => {
cancel();
});
this.runningTasks = [];
this.abortController.abort();
this.isRunning = false;
}
}
+2
View File
@@ -36,6 +36,8 @@ export type {
export * from './indexing';
export * from './test-utils';
export type { ScheduleTaskParameters } from './Scheduler';
/**
* @deprecated Import from @backstage/plugin-search-common instead
*/
@@ -1,84 +0,0 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { runPeriodically } from './runPeriodically';
jest.useFakeTimers();
describe('runPeriodically', () => {
const flushPromises = async () => {
const promise = new Promise(resolve => process.nextTick(resolve));
jest.runAllTicks();
await promise;
};
const advanceTimersByTime = async (time: number) => {
jest.advanceTimersByTime(time);
// Advancing the time with jest doesn't run all promises, but only sync code
await flushPromises();
};
it('runs task initially', async () => {
const task = jest.fn(async () => {});
const cancel = runPeriodically(task, 1000);
expect(task).toHaveBeenCalledTimes(1);
cancel();
});
it('runs at requested interval', async () => {
const task = jest.fn(async () => {});
const cancel = runPeriodically(task, 1000);
await flushPromises();
await advanceTimersByTime(1000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(3);
cancel();
});
it('stops after being canceled', async () => {
const task = jest.fn(async () => {});
const cancel = runPeriodically(task, 1000);
await flushPromises();
cancel();
await advanceTimersByTime(1000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(1);
});
it('continues running after failures', async () => {
const task = jest.fn(async () => {
throw new Error();
});
const cancel = runPeriodically(task, 1000);
await flushPromises();
await advanceTimersByTime(1000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(3);
cancel();
});
it('waits till a long running task is completed', async () => {
const task = jest.fn(
() => new Promise(resolve => setTimeout(resolve, 10000)),
);
const cancel = runPeriodically(task, 1000);
await flushPromises();
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(1);
await advanceTimersByTime(9000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(2);
cancel();
});
});
@@ -1,54 +0,0 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Runs a function repeatedly, with a fixed wait between invocations.
*
* Supports async functions, and silently ignores exceptions and rejections.
*
* @param fn - The function to run. May return a Promise.
* @param delayMs - The delay between a completed function invocation and the
* next.
* @returns A function that, when called, stops the invocation loop.
*/
export function runPeriodically(fn: () => any, delayMs: number): () => void {
let cancel: () => void;
let cancelled = false;
const cancellationPromise = new Promise<void>(resolve => {
cancel = () => {
resolve();
cancelled = true;
};
});
const startRefresh = async () => {
while (!cancelled) {
try {
await fn();
} catch {
// ignore intentionally
}
await Promise.race([
new Promise(resolve => setTimeout(resolve, delayMs)),
cancellationPromise,
]);
}
};
startRefresh();
return cancel!;
}
+4 -3
View File
@@ -14,6 +14,7 @@
* limitations under the License.
*/
import { TaskRunner } from '@backstage/backend-tasks';
import {
DocumentCollatorFactory,
DocumentDecoratorFactory,
@@ -35,10 +36,10 @@ export type IndexBuilderOptions = {
*/
export interface RegisterCollatorParameters {
/**
* The default interval (in seconds) that the provided collator will be called (can be overridden in config).
* The schedule for which the provided collator will be called, commonly the result of
* {@link @backstage/backend-tasks#PluginTaskScheduler.createScheduledTaskRunner}
*/
defaultRefreshIntervalSeconds: number;
schedule: TaskRunner;
/**
* The class responsible for returning the document collator of the given type.
*/