feat(search): handle search indexing coordination among nodes
Signed-off-by: Phil Kuang <pkuang@factset.com>
This commit is contained in:
@@ -0,0 +1,84 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-node': minor
|
||||
'@backstage/create-app': patch
|
||||
---
|
||||
|
||||
**BREAKING**: `IndexBuilder.addCollator()` now requires a `schedule` parameter (replacing `defaultRefreshIntervalSeconds`) which is expected to be a `TaskRunner` that is configured with the desired search indexing schedule for the given collator.
|
||||
|
||||
`Scheduler.addToSchedule()` now takes a new parameter object (`ScheduleTaskParameters`) with two new options `id` and `scheduledRunner` in addition to the migrated `task` argument.
|
||||
|
||||
NOTE: The search backend plugin now creates a dedicated database for coordinating indexing tasks.
|
||||
|
||||
To make this change to an existing app, make the following changes to `packages/backend/src/plugins/search.ts`:
|
||||
|
||||
```diff
|
||||
+import { Duration } from 'luxon';
|
||||
|
||||
/* ... */
|
||||
|
||||
+ const schedule = env.scheduler.createScheduledTaskRunner({
|
||||
+ frequency: Duration.fromObject({ seconds: 600 }),
|
||||
+ timeout: Duration.fromObject({ seconds: 900 }),
|
||||
+ initialDelay: Duration.fromObject({ seconds: 3 }),
|
||||
+ });
|
||||
|
||||
indexBuilder.addCollator({
|
||||
- defaultRefreshIntervalSeconds: 600,
|
||||
+ schedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
}),
|
||||
});
|
||||
|
||||
indexBuilder.addCollator({
|
||||
- defaultRefreshIntervalSeconds: 600,
|
||||
+ schedule,
|
||||
factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
}),
|
||||
});
|
||||
|
||||
const { scheduler } = await indexBuilder.build();
|
||||
- setTimeout(() => scheduler.start(), 3000);
|
||||
+ scheduler.start();
|
||||
/* ... */
|
||||
```
|
||||
|
||||
NOTE: For scenarios where the `lunr` search engine is used in a multi-node configuration, a non-distributed `TaskRunner` like the following should be implemented to ensure consistency across nodes (alternatively, you can configure
|
||||
the search plugin to use a non-distributed DB such as [SQLite](https://backstage.io/docs/tutorials/configuring-plugin-databases#postgresql-and-sqlite-3)):
|
||||
|
||||
```diff
|
||||
+import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks';
|
||||
|
||||
/* ... */
|
||||
|
||||
+ const schedule: TaskRunner = {
|
||||
+ run: async (task: TaskInvocationDefinition) => {
|
||||
+ const startRefresh = async () => {
|
||||
+ while (!task.signal?.aborted) {
|
||||
+ try {
|
||||
+ await task.fn(task.signal);
|
||||
+ } catch {
|
||||
+ // ignore intentionally
|
||||
+ }
|
||||
+
|
||||
+ await new Promise(resolve => setTimeout(resolve, 600 * 1000));
|
||||
+ }
|
||||
+ };
|
||||
+ startRefresh();
|
||||
+ },
|
||||
+ };
|
||||
|
||||
indexBuilder.addCollator({
|
||||
- defaultRefreshIntervalSeconds: 600,
|
||||
+ schedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
}),
|
||||
});
|
||||
|
||||
/* ... */
|
||||
```
|
||||
@@ -84,7 +84,9 @@ index-time.
|
||||
There are many ways a search index could be built and maintained, but Backstage
|
||||
Search chooses to completely rebuild indices on a schedule. Different collators
|
||||
can be configured to refresh at different intervals, depending on how often the
|
||||
source information is updated.
|
||||
source information is updated. When search indexing is distributed among multiple
|
||||
backend nodes, coordination to prevent clashes is typically handled by a
|
||||
distributed `TaskRunner`.
|
||||
|
||||
### The Search Page
|
||||
|
||||
|
||||
@@ -149,6 +149,7 @@ import {
|
||||
import { PluginEnvironment } from '../types';
|
||||
import { DefaultCatalogCollator } from '@backstage/plugin-catalog-backend';
|
||||
import { Router } from 'express';
|
||||
import { Duration } from 'luxon';
|
||||
|
||||
export default async function createPlugin(
|
||||
env: PluginEnvironment,
|
||||
@@ -161,9 +162,15 @@ export default async function createPlugin(
|
||||
searchEngine,
|
||||
});
|
||||
|
||||
const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({
|
||||
frequency: Duration.fromObject({ seconds: 600 }),
|
||||
timeout: Duration.fromObject({ seconds: 900 }),
|
||||
initialDelay: Duration.fromObject({ seconds: 3 }),
|
||||
});
|
||||
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 600,
|
||||
collator: new DefaultCatalogCollator({
|
||||
schedule: every10MinutesSchedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
}),
|
||||
@@ -287,32 +294,87 @@ which are responsible for providing documents
|
||||
number of collators with the `IndexBuilder` like this:
|
||||
|
||||
```typescript
|
||||
import { Duration } from 'luxon';
|
||||
|
||||
const indexBuilder = new IndexBuilder({ logger: env.logger, searchEngine });
|
||||
|
||||
const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({
|
||||
frequency: Duration.fromObject({ seconds: 600 }),
|
||||
timeout: Duration.fromObject({ seconds: 900 }),
|
||||
initialDelay: Duration.fromObject({ seconds: 3 }),
|
||||
});
|
||||
|
||||
const everyHourSchedule = env.scheduler.createScheduledTaskRunner({
|
||||
frequency: Duration.fromObject({ seconds: 3600 }),
|
||||
timeout: Duration.fromObject({ seconds: 5400 }),
|
||||
initialDelay: Duration.fromObject({ seconds: 3 }),
|
||||
});
|
||||
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 600,
|
||||
collator: new DefaultCatalogCollator({
|
||||
schedule: every10MinutesSchedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
}),
|
||||
});
|
||||
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 3600,
|
||||
collator: new MyCustomCollator(),
|
||||
schedule: everyHourSchedule,
|
||||
factory: new MyCustomCollatorFactory(),
|
||||
});
|
||||
```
|
||||
|
||||
Backstage Search builds and maintains its index
|
||||
[on a schedule](./concepts.md#the-scheduler). You can change how often the
|
||||
indexes are rebuilt for a given type of document. You may want to do this if
|
||||
your documents are updated more or less frequently. You can do so by modifying
|
||||
its `defaultRefreshIntervalSeconds` value, like this:
|
||||
your documents are updated more or less frequently. You can do so by configuring
|
||||
a scheduled `TaskRunner` to pass into the `schedule` value, like this:
|
||||
|
||||
```typescript {3}
|
||||
const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({
|
||||
frequency: Duration.fromObject({ seconds: 600 }),
|
||||
timeout: Duration.fromObject({ seconds: 900 }),
|
||||
initialDelay: Duration.fromObject({ seconds: 3 }),
|
||||
});
|
||||
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 600,
|
||||
collator: new DefaultCatalogCollator({
|
||||
schedule: every10MinutesSchedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
}),
|
||||
});
|
||||
```
|
||||
|
||||
Note: if you are using the in-memory Lunr search engine, you probably want to
|
||||
implement a non-distributed `TaskRunner` like the following to ensure consistency
|
||||
if you're running multiple search backend nodes (alternatively, you can configure
|
||||
the search plugin to use a non-distributed database such as
|
||||
[SQLite](../../tutorials/configuring-plugin-databases.md#postgresql-and-sqlite-3)):
|
||||
|
||||
```typescript
|
||||
import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks';
|
||||
|
||||
const schedule: TaskRunner = {
|
||||
run: async (task: TaskInvocationDefinition) => {
|
||||
const startRefresh = async () => {
|
||||
while (!task.signal?.aborted) {
|
||||
try {
|
||||
await task.fn(task.signal);
|
||||
} catch {
|
||||
// ignore intentionally
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, 600 * 1000));
|
||||
}
|
||||
};
|
||||
startRefresh();
|
||||
},
|
||||
};
|
||||
|
||||
indexBuilder.addCollator({
|
||||
schedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
}),
|
||||
|
||||
@@ -68,6 +68,7 @@
|
||||
"express": "^4.17.1",
|
||||
"express-promise-router": "^4.1.0",
|
||||
"express-prom-bundle": "^6.3.6",
|
||||
"luxon": "^2.0.2",
|
||||
"pg": "^8.3.0",
|
||||
"pg-connection-string": "^2.3.0",
|
||||
"prom-client": "^14.0.1",
|
||||
@@ -77,7 +78,8 @@
|
||||
"@backstage/cli": "^0.17.0-next.1",
|
||||
"@types/dockerode": "^3.3.0",
|
||||
"@types/express": "^4.17.6",
|
||||
"@types/express-serve-static-core": "^4.17.5"
|
||||
"@types/express-serve-static-core": "^4.17.5",
|
||||
"@types/luxon": "^2.0.4"
|
||||
},
|
||||
"files": [
|
||||
"dist"
|
||||
|
||||
@@ -26,6 +26,7 @@ import {
|
||||
} from '@backstage/plugin-search-backend-node';
|
||||
import { DefaultTechDocsCollatorFactory } from '@backstage/plugin-techdocs-backend';
|
||||
import { Router } from 'express';
|
||||
import { Duration } from 'luxon';
|
||||
import { PluginEnvironment } from '../types';
|
||||
|
||||
async function createSearchEngine(
|
||||
@@ -55,10 +56,18 @@ export default async function createPlugin(
|
||||
searchEngine,
|
||||
});
|
||||
|
||||
const schedule = env.scheduler.createScheduledTaskRunner({
|
||||
frequency: Duration.fromObject({ seconds: 600 }),
|
||||
timeout: Duration.fromObject({ seconds: 900 }),
|
||||
// A 3 second delay gives the backend server a chance to initialize before
|
||||
// any collators are executed, which may attempt requests against the API.
|
||||
initialDelay: Duration.fromObject({ seconds: 3 }),
|
||||
});
|
||||
|
||||
// Collators are responsible for gathering documents known to plugins. This
|
||||
// particular collator gathers entities from the software catalog.
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 600,
|
||||
schedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
@@ -66,7 +75,7 @@ export default async function createPlugin(
|
||||
});
|
||||
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 600,
|
||||
schedule,
|
||||
factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
logger: env.logger,
|
||||
@@ -77,10 +86,8 @@ export default async function createPlugin(
|
||||
// The scheduler controls when documents are gathered from collators and sent
|
||||
// to the search engine for indexing.
|
||||
const { scheduler } = await indexBuilder.build();
|
||||
scheduler.start();
|
||||
|
||||
// A 3 second delay gives the backend server a chance to initialize before
|
||||
// any collators are executed, which may attempt requests against the API.
|
||||
setTimeout(() => scheduler.start(), 3000);
|
||||
useHotCleanup(module, () => scheduler.stop());
|
||||
|
||||
return await createRouter({
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"dockerode": "^3.3.1",
|
||||
"express": "^4.17.1",
|
||||
"express-promise-router": "^4.1.0",
|
||||
"luxon": "^2.0.2",
|
||||
{{#if dbTypePG}}
|
||||
"pg": "^8.3.0",
|
||||
{{/if}}
|
||||
@@ -52,7 +53,8 @@
|
||||
"@backstage/cli": "^{{version '@backstage/cli'}}",
|
||||
"@types/dockerode": "^3.3.0",
|
||||
"@types/express": "^4.17.6",
|
||||
"@types/express-serve-static-core": "^4.17.5"
|
||||
"@types/express-serve-static-core": "^4.17.5",
|
||||
"@types/luxon": "^2.0.4"
|
||||
},
|
||||
"files": [
|
||||
"dist"
|
||||
|
||||
+12
-5
@@ -11,6 +11,7 @@ import { PluginEnvironment } from '../types';
|
||||
import { DefaultCatalogCollatorFactory } from '@backstage/plugin-catalog-backend';
|
||||
import { DefaultTechDocsCollatorFactory } from '@backstage/plugin-techdocs-backend';
|
||||
import { Router } from 'express';
|
||||
import { Duration } from 'luxon';
|
||||
|
||||
export default async function createPlugin(
|
||||
env: PluginEnvironment,
|
||||
@@ -31,10 +32,18 @@ export default async function createPlugin(
|
||||
searchEngine,
|
||||
});
|
||||
|
||||
const schedule = env.scheduler.createScheduledTaskRunner({
|
||||
frequency: Duration.fromObject({ seconds: 600 }),
|
||||
timeout: Duration.fromObject({ seconds: 900 }),
|
||||
// A 3 second delay gives the backend server a chance to initialize before
|
||||
// any collators are executed, which may attempt requests against the API.
|
||||
initialDelay: Duration.fromObject({ seconds: 3 }),
|
||||
});
|
||||
|
||||
// Collators are responsible for gathering documents known to plugins. This
|
||||
// collator gathers entities from the software catalog.
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 600,
|
||||
schedule,
|
||||
factory: DefaultCatalogCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
tokenManager: env.tokenManager,
|
||||
@@ -43,7 +52,7 @@ export default async function createPlugin(
|
||||
|
||||
// collator gathers entities from techdocs.
|
||||
indexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 600,
|
||||
schedule,
|
||||
factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, {
|
||||
discovery: env.discovery,
|
||||
logger: env.logger,
|
||||
@@ -54,10 +63,8 @@ export default async function createPlugin(
|
||||
// The scheduler controls when documents are gathered from collators and sent
|
||||
// to the search engine for indexing.
|
||||
const { scheduler } = await indexBuilder.build();
|
||||
scheduler.start();
|
||||
|
||||
// A 3 second delay gives the backend server a chance to initialize before
|
||||
// any collators are executed, which may attempt requests against the API.
|
||||
setTimeout(() => scheduler.start(), 3000);
|
||||
useHotCleanup(module, () => scheduler.stop());
|
||||
|
||||
return await createRouter({
|
||||
|
||||
@@ -16,6 +16,8 @@ import { QueryTranslator } from '@backstage/plugin-search-common';
|
||||
import { Readable } from 'stream';
|
||||
import { SearchEngine } from '@backstage/plugin-search-common';
|
||||
import { SearchQuery } from '@backstage/plugin-search-common';
|
||||
import { TaskFunction } from '@backstage/backend-tasks';
|
||||
import { TaskRunner } from '@backstage/backend-tasks';
|
||||
import { Transform } from 'stream';
|
||||
import { Writable } from 'stream';
|
||||
|
||||
@@ -52,10 +54,7 @@ export abstract class DecoratorBase extends Transform {
|
||||
// @beta (undocumented)
|
||||
export class IndexBuilder {
|
||||
constructor({ logger, searchEngine }: IndexBuilderOptions);
|
||||
addCollator({
|
||||
factory,
|
||||
defaultRefreshIntervalSeconds,
|
||||
}: RegisterCollatorParameters): void;
|
||||
addCollator({ factory, schedule }: RegisterCollatorParameters): void;
|
||||
addDecorator({ factory }: RegisterDecoratorParameters): void;
|
||||
build(): Promise<{
|
||||
scheduler: Scheduler;
|
||||
@@ -111,8 +110,8 @@ export class LunrSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
|
||||
// @beta
|
||||
export interface RegisterCollatorParameters {
|
||||
defaultRefreshIntervalSeconds: number;
|
||||
factory: DocumentCollatorFactory;
|
||||
schedule: TaskRunner;
|
||||
}
|
||||
|
||||
// @beta
|
||||
@@ -123,11 +122,21 @@ export interface RegisterDecoratorParameters {
|
||||
// @beta (undocumented)
|
||||
export class Scheduler {
|
||||
constructor({ logger }: { logger: Logger });
|
||||
addToSchedule(task: Function, interval: number): void;
|
||||
addToSchedule({ id, task, scheduledRunner }: ScheduleTaskParameters): void;
|
||||
start(): void;
|
||||
stop(): void;
|
||||
}
|
||||
|
||||
// @public
|
||||
export interface ScheduleTaskParameters {
|
||||
// (undocumented)
|
||||
id: string;
|
||||
// (undocumented)
|
||||
scheduledRunner: TaskRunner;
|
||||
// (undocumented)
|
||||
task: TaskFunction;
|
||||
}
|
||||
|
||||
export { SearchEngine };
|
||||
|
||||
// @beta
|
||||
|
||||
@@ -23,11 +23,13 @@
|
||||
"clean": "backstage-cli package clean"
|
||||
},
|
||||
"dependencies": {
|
||||
"@backstage/backend-tasks": "^0.3.0-next.2",
|
||||
"@backstage/errors": "^1.0.0",
|
||||
"@backstage/plugin-search-common": "^0.3.3-next.1",
|
||||
"@types/lunr": "^2.3.3",
|
||||
"lodash": "^4.17.21",
|
||||
"lunr": "^2.3.9",
|
||||
"node-abort-controller": "^3.0.1",
|
||||
"winston": "^3.2.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
import { getVoidLogger } from '@backstage/backend-common';
|
||||
import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks';
|
||||
import {
|
||||
DocumentCollatorFactory,
|
||||
DocumentDecoratorFactory,
|
||||
@@ -53,9 +54,15 @@ class DifferentlyTypedDocumentDecoratorFactory extends TestDocumentDecoratorFact
|
||||
describe('IndexBuilder', () => {
|
||||
let testSearchEngine: SearchEngine;
|
||||
let testIndexBuilder: IndexBuilder;
|
||||
let testScheduledTaskRunner: TaskRunner;
|
||||
|
||||
beforeEach(() => {
|
||||
const logger = getVoidLogger();
|
||||
testScheduledTaskRunner = {
|
||||
run: async (task: TaskInvocationDefinition & { fn: () => void }) => {
|
||||
task.fn();
|
||||
},
|
||||
};
|
||||
testSearchEngine = new LunrSearchEngine({ logger });
|
||||
testIndexBuilder = new IndexBuilder({
|
||||
logger,
|
||||
@@ -65,35 +72,32 @@ describe('IndexBuilder', () => {
|
||||
|
||||
describe('addCollator', () => {
|
||||
it('adds a collator', async () => {
|
||||
jest.useFakeTimers();
|
||||
const testCollatorFactory = new TestDocumentCollatorFactory();
|
||||
const collatorSpy = jest.spyOn(testCollatorFactory, 'getCollator');
|
||||
|
||||
// Add a collator.
|
||||
testIndexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 6,
|
||||
factory: testCollatorFactory,
|
||||
schedule: testScheduledTaskRunner,
|
||||
});
|
||||
|
||||
// Build the index and ensure the collator was invoked.
|
||||
const { scheduler } = await testIndexBuilder.build();
|
||||
scheduler.start();
|
||||
jest.advanceTimersByTime(6000);
|
||||
expect(collatorSpy).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('addDecorator', () => {
|
||||
it('adds a decorator', async () => {
|
||||
jest.useFakeTimers();
|
||||
const testCollatorFactory = new TestDocumentCollatorFactory();
|
||||
const testDecoratorFactory = new TestDocumentDecoratorFactory();
|
||||
const decoratorSpy = jest.spyOn(testDecoratorFactory, 'getDecorator');
|
||||
|
||||
// Add a collator.
|
||||
testIndexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 6,
|
||||
factory: testCollatorFactory,
|
||||
schedule: testScheduledTaskRunner,
|
||||
});
|
||||
|
||||
// Add a decorator.
|
||||
@@ -104,14 +108,12 @@ describe('IndexBuilder', () => {
|
||||
// Build the index and ensure the decorator was invoked.
|
||||
const { scheduler } = await testIndexBuilder.build();
|
||||
scheduler.start();
|
||||
jest.advanceTimersByTime(6000);
|
||||
// wait for async decorator execution
|
||||
await Promise.resolve();
|
||||
expect(decoratorSpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('adds a type-specific decorator', async () => {
|
||||
jest.useFakeTimers();
|
||||
const testCollatorFactory = new TypedDocumentCollatorFactory();
|
||||
const testDecoratorFactory = new TypedDocumentDecoratorFactory();
|
||||
jest.spyOn(testCollatorFactory, 'getCollator');
|
||||
@@ -119,8 +121,8 @@ describe('IndexBuilder', () => {
|
||||
|
||||
// Add a collator.
|
||||
testIndexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 6,
|
||||
factory: testCollatorFactory,
|
||||
schedule: testScheduledTaskRunner,
|
||||
});
|
||||
|
||||
// Add a decorator for the same type.
|
||||
@@ -131,7 +133,6 @@ describe('IndexBuilder', () => {
|
||||
// Build the index and ensure the decorator was invoked.
|
||||
const { scheduler } = await testIndexBuilder.build();
|
||||
scheduler.start();
|
||||
jest.advanceTimersByTime(6000);
|
||||
// wait for async decorator execution
|
||||
await Promise.resolve();
|
||||
expect(decoratorSpy).toHaveBeenCalled();
|
||||
@@ -146,8 +147,8 @@ describe('IndexBuilder', () => {
|
||||
|
||||
// Add a collator.
|
||||
testIndexBuilder.addCollator({
|
||||
defaultRefreshIntervalSeconds: 6,
|
||||
factory: testCollatorFactory,
|
||||
schedule: testScheduledTaskRunner,
|
||||
});
|
||||
|
||||
// Add a decorator for a different type.
|
||||
@@ -158,7 +159,6 @@ describe('IndexBuilder', () => {
|
||||
// Build the index and ensure the decorator was not invoked.
|
||||
const { scheduler } = await testIndexBuilder.build();
|
||||
scheduler.start();
|
||||
jest.advanceTimersByTime(6000);
|
||||
expect(collatorSpy).toHaveBeenCalled();
|
||||
expect(decoratorSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -13,32 +13,25 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
DocumentCollatorFactory,
|
||||
DocumentDecoratorFactory,
|
||||
DocumentTypeInfo,
|
||||
SearchEngine,
|
||||
} from '@backstage/plugin-search-common';
|
||||
import { Transform, pipeline } from 'stream';
|
||||
import { Logger } from 'winston';
|
||||
import { Scheduler } from './index';
|
||||
import { Scheduler } from './Scheduler';
|
||||
import {
|
||||
IndexBuilderOptions,
|
||||
RegisterCollatorParameters,
|
||||
RegisterDecoratorParameters,
|
||||
} from './types';
|
||||
|
||||
interface CollatorEnvelope {
|
||||
factory: DocumentCollatorFactory;
|
||||
refreshInterval: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* @beta
|
||||
*/
|
||||
export class IndexBuilder {
|
||||
private collators: Record<string, CollatorEnvelope>;
|
||||
private collators: Record<string, RegisterCollatorParameters>;
|
||||
private decorators: Record<string, DocumentDecoratorFactory[]>;
|
||||
private documentTypes: Record<string, DocumentTypeInfo>;
|
||||
private searchEngine: SearchEngine;
|
||||
@@ -64,16 +57,13 @@ export class IndexBuilder {
|
||||
* Makes the index builder aware of a collator that should be executed at the
|
||||
* given refresh interval.
|
||||
*/
|
||||
addCollator({
|
||||
factory,
|
||||
defaultRefreshIntervalSeconds,
|
||||
}: RegisterCollatorParameters): void {
|
||||
addCollator({ factory, schedule }: RegisterCollatorParameters): void {
|
||||
this.logger.info(
|
||||
`Added ${factory.constructor.name} collator factory for type ${factory.type}`,
|
||||
);
|
||||
this.collators[factory.type] = {
|
||||
refreshInterval: defaultRefreshIntervalSeconds,
|
||||
factory,
|
||||
schedule,
|
||||
};
|
||||
this.documentTypes[factory.type] = {
|
||||
visibilityPermission: factory.visibilityPermission,
|
||||
@@ -106,51 +96,57 @@ export class IndexBuilder {
|
||||
* scheduler returned to the caller.
|
||||
*/
|
||||
async build(): Promise<{ scheduler: Scheduler }> {
|
||||
const scheduler = new Scheduler({ logger: this.logger });
|
||||
const scheduler = new Scheduler({
|
||||
logger: this.logger,
|
||||
});
|
||||
|
||||
Object.keys(this.collators).forEach(type => {
|
||||
scheduler.addToSchedule(async () => {
|
||||
// Instantiate the collator.
|
||||
const collator = await this.collators[type].factory.getCollator();
|
||||
this.logger.info(
|
||||
`Collating documents for ${type} via ${this.collators[type].factory.constructor.name}`,
|
||||
);
|
||||
|
||||
// Instantiate all relevant decorators.
|
||||
const decorators: Transform[] = await Promise.all(
|
||||
(this.decorators['*'] || [])
|
||||
.concat(this.decorators[type] || [])
|
||||
.map(async factory => {
|
||||
const decorator = await factory.getDecorator();
|
||||
this.logger.info(
|
||||
`Attached decorator via ${factory.constructor.name} to ${type} index pipeline.`,
|
||||
);
|
||||
return decorator;
|
||||
}),
|
||||
);
|
||||
|
||||
// Instantiate the indexer.
|
||||
const indexer = await this.searchEngine.getIndexer(type);
|
||||
|
||||
// Compose collator/decorators/indexer into a pipeline
|
||||
return new Promise<void>(done => {
|
||||
pipeline(
|
||||
[collator, ...decorators, indexer],
|
||||
(error: NodeJS.ErrnoException | null) => {
|
||||
if (error) {
|
||||
this.logger.error(
|
||||
`Collating documents for ${type} failed: ${error}`,
|
||||
);
|
||||
} else {
|
||||
this.logger.info(`Collating documents for ${type} succeeded`);
|
||||
}
|
||||
|
||||
// Signal index pipeline completion!
|
||||
done();
|
||||
},
|
||||
scheduler.addToSchedule({
|
||||
id: `search_index_${type.replace('-', '_').toLocaleLowerCase('en-US')}`,
|
||||
scheduledRunner: this.collators[type].schedule,
|
||||
task: async () => {
|
||||
// Instantiate the collator.
|
||||
const collator = await this.collators[type].factory.getCollator();
|
||||
this.logger.info(
|
||||
`Collating documents for ${type} via ${this.collators[type].factory.constructor.name}`,
|
||||
);
|
||||
});
|
||||
}, this.collators[type].refreshInterval * 1000);
|
||||
|
||||
// Instantiate all relevant decorators.
|
||||
const decorators: Transform[] = await Promise.all(
|
||||
(this.decorators['*'] || [])
|
||||
.concat(this.decorators[type] || [])
|
||||
.map(async factory => {
|
||||
const decorator = await factory.getDecorator();
|
||||
this.logger.info(
|
||||
`Attached decorator via ${factory.constructor.name} to ${type} index pipeline.`,
|
||||
);
|
||||
return decorator;
|
||||
}),
|
||||
);
|
||||
|
||||
// Instantiate the indexer.
|
||||
const indexer = await this.searchEngine.getIndexer(type);
|
||||
|
||||
// Compose collator/decorators/indexer into a pipeline
|
||||
return new Promise<void>(done => {
|
||||
pipeline(
|
||||
[collator, ...decorators, indexer],
|
||||
(error: NodeJS.ErrnoException | null) => {
|
||||
if (error) {
|
||||
this.logger.error(
|
||||
`Collating documents for ${type} failed: ${error}`,
|
||||
);
|
||||
} else {
|
||||
this.logger.info(`Collating documents for ${type} succeeded`);
|
||||
}
|
||||
|
||||
// Signal index pipeline completion!
|
||||
done();
|
||||
},
|
||||
);
|
||||
});
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
return {
|
||||
|
||||
@@ -29,31 +29,107 @@ describe('Scheduler', () => {
|
||||
|
||||
describe('addToSchedule', () => {
|
||||
it('should not add a task and interval to schedule, if already started', async () => {
|
||||
jest.useFakeTimers();
|
||||
const mockTask1 = jest.fn();
|
||||
const mockTask2 = jest.fn();
|
||||
const mockScheduledTaskRunner1 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
const mockScheduledTaskRunner2 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
|
||||
// Add a task and interval to schedule
|
||||
testScheduler.addToSchedule(mockTask1, 2);
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id1',
|
||||
task: mockTask1,
|
||||
scheduledRunner: mockScheduledTaskRunner1,
|
||||
});
|
||||
|
||||
// Starts scheduling process
|
||||
testScheduler.start();
|
||||
|
||||
// Throws Error if task and interval is added to a already started schedule
|
||||
expect(() => testScheduler.addToSchedule(mockTask2, 2)).toThrowError();
|
||||
expect(() =>
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id2',
|
||||
task: mockTask2,
|
||||
scheduledRunner: mockScheduledTaskRunner2,
|
||||
}),
|
||||
).toThrowError();
|
||||
|
||||
jest.runOnlyPendingTimers();
|
||||
expect(mockTask1).toHaveBeenCalled();
|
||||
expect(mockTask2).not.toHaveBeenCalled();
|
||||
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id1',
|
||||
fn: mockTask1,
|
||||
}),
|
||||
);
|
||||
expect(mockScheduledTaskRunner2.run).not.toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id2',
|
||||
fn: mockTask2,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should not add a task to schedule, if it already exists', async () => {
|
||||
const mockTask1 = jest.fn();
|
||||
const mockTask2 = jest.fn();
|
||||
const mockScheduledTaskRunner1 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
const mockScheduledTaskRunner2 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
|
||||
// Add a task and interval to schedule
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id1',
|
||||
task: mockTask1,
|
||||
scheduledRunner: mockScheduledTaskRunner1,
|
||||
});
|
||||
|
||||
// Throws Error if task and interval is added to a already started schedule
|
||||
expect(() =>
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id1',
|
||||
task: mockTask2,
|
||||
scheduledRunner: mockScheduledTaskRunner2,
|
||||
}),
|
||||
).toThrowError();
|
||||
|
||||
// Starts scheduling process
|
||||
testScheduler.start();
|
||||
|
||||
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id1',
|
||||
fn: mockTask1,
|
||||
}),
|
||||
);
|
||||
expect(mockScheduledTaskRunner2.run).not.toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id2',
|
||||
fn: mockTask2,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should be possible to add a task and interval to schedule, if already started, but stopped in between', async () => {
|
||||
jest.useFakeTimers();
|
||||
const mockTask1 = jest.fn();
|
||||
const mockTask2 = jest.fn();
|
||||
const mockScheduledTaskRunner1 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
const mockScheduledTaskRunner2 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
|
||||
// Add a task and interval to schedule
|
||||
testScheduler.addToSchedule(mockTask1, 2);
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id1',
|
||||
task: mockTask1,
|
||||
scheduledRunner: mockScheduledTaskRunner1,
|
||||
});
|
||||
|
||||
// Starts scheduling process
|
||||
testScheduler.start();
|
||||
@@ -63,15 +139,28 @@ describe('Scheduler', () => {
|
||||
|
||||
// Shouldn't throw error, as it is stopped.
|
||||
expect(() =>
|
||||
testScheduler.addToSchedule(mockTask2, 4),
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id2',
|
||||
task: mockTask2,
|
||||
scheduledRunner: mockScheduledTaskRunner2,
|
||||
}),
|
||||
).not.toThrowError();
|
||||
|
||||
// Starts scheduling process
|
||||
testScheduler.start();
|
||||
|
||||
jest.runOnlyPendingTimers();
|
||||
expect(mockTask1).toHaveBeenCalled();
|
||||
expect(mockTask2).toHaveBeenCalled();
|
||||
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id1',
|
||||
fn: mockTask1,
|
||||
}),
|
||||
);
|
||||
expect(mockScheduledTaskRunner2.run).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id2',
|
||||
fn: mockTask2,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -79,16 +168,40 @@ describe('Scheduler', () => {
|
||||
it('should execute tasks on start', () => {
|
||||
const mockTask1 = jest.fn();
|
||||
const mockTask2 = jest.fn();
|
||||
const mockScheduledTaskRunner1 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
const mockScheduledTaskRunner2 = {
|
||||
run: jest.fn(),
|
||||
};
|
||||
|
||||
// Add tasks and interval to schedule
|
||||
testScheduler.addToSchedule(mockTask1, 2);
|
||||
testScheduler.addToSchedule(mockTask2, 2);
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id1',
|
||||
task: mockTask1,
|
||||
scheduledRunner: mockScheduledTaskRunner1,
|
||||
});
|
||||
testScheduler.addToSchedule({
|
||||
id: 'id2',
|
||||
task: mockTask2,
|
||||
scheduledRunner: mockScheduledTaskRunner2,
|
||||
});
|
||||
|
||||
// Starts scheduling process
|
||||
testScheduler.start();
|
||||
|
||||
expect(mockTask1).toHaveBeenCalled();
|
||||
expect(mockTask2).toHaveBeenCalled();
|
||||
expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id1',
|
||||
fn: mockTask1,
|
||||
}),
|
||||
);
|
||||
expect(mockScheduledTaskRunner2.run).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
id: 'id2',
|
||||
fn: mockTask2,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -14,29 +14,38 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { AbortController } from 'node-abort-controller';
|
||||
import { Logger } from 'winston';
|
||||
import { runPeriodically } from './runPeriodically';
|
||||
import { TaskFunction, TaskRunner } from '@backstage/backend-tasks';
|
||||
|
||||
type TaskEnvelope = {
|
||||
task: Function;
|
||||
interval: number;
|
||||
task: TaskFunction;
|
||||
scheduledRunner: TaskRunner;
|
||||
};
|
||||
|
||||
/**
|
||||
* TODO: coordination, error handling
|
||||
* @public ScheduleTaskParameters
|
||||
*/
|
||||
export interface ScheduleTaskParameters {
|
||||
id: string;
|
||||
task: TaskFunction;
|
||||
scheduledRunner: TaskRunner;
|
||||
}
|
||||
|
||||
/**
|
||||
* @beta
|
||||
*/
|
||||
export class Scheduler {
|
||||
private logger: Logger;
|
||||
private schedule: TaskEnvelope[];
|
||||
private runningTasks: Function[] = [];
|
||||
private schedule: { [id: string]: TaskEnvelope };
|
||||
private abortController: AbortController;
|
||||
private isRunning: boolean;
|
||||
|
||||
constructor({ logger }: { logger: Logger }) {
|
||||
this.logger = logger;
|
||||
this.schedule = [];
|
||||
this.schedule = {};
|
||||
this.abortController = new AbortController();
|
||||
this.isRunning = false;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -44,13 +53,18 @@ export class Scheduler {
|
||||
* When running the tasks, the scheduler waits at least for the time specified
|
||||
* in the interval once the task was completed, before running it again.
|
||||
*/
|
||||
addToSchedule(task: Function, interval: number) {
|
||||
if (this.runningTasks.length) {
|
||||
addToSchedule({ id, task, scheduledRunner }: ScheduleTaskParameters) {
|
||||
if (this.isRunning) {
|
||||
throw new Error(
|
||||
'Cannot add task to schedule that has already been started.',
|
||||
);
|
||||
}
|
||||
this.schedule.push({ task, interval });
|
||||
|
||||
if (this.schedule[id]) {
|
||||
throw new Error(`Task with id ${id} already exists.`);
|
||||
}
|
||||
|
||||
this.schedule[id] = { task, scheduledRunner };
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -58,8 +72,14 @@ export class Scheduler {
|
||||
*/
|
||||
start() {
|
||||
this.logger.info('Starting all scheduled search tasks.');
|
||||
this.schedule.forEach(({ task, interval }) => {
|
||||
this.runningTasks.push(runPeriodically(() => task(), interval));
|
||||
this.isRunning = true;
|
||||
Object.keys(this.schedule).forEach(id => {
|
||||
const { task, scheduledRunner } = this.schedule[id];
|
||||
scheduledRunner.run({
|
||||
id,
|
||||
fn: task,
|
||||
signal: this.abortController.signal,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -68,9 +88,7 @@ export class Scheduler {
|
||||
*/
|
||||
stop() {
|
||||
this.logger.info('Stopping all scheduled search tasks.');
|
||||
this.runningTasks.forEach(cancel => {
|
||||
cancel();
|
||||
});
|
||||
this.runningTasks = [];
|
||||
this.abortController.abort();
|
||||
this.isRunning = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ export type {
|
||||
export * from './indexing';
|
||||
export * from './test-utils';
|
||||
|
||||
export type { ScheduleTaskParameters } from './Scheduler';
|
||||
|
||||
/**
|
||||
* @deprecated Import from @backstage/plugin-search-common instead
|
||||
*/
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
/*
|
||||
* Copyright 2020 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { runPeriodically } from './runPeriodically';
|
||||
|
||||
jest.useFakeTimers();
|
||||
|
||||
describe('runPeriodically', () => {
|
||||
const flushPromises = async () => {
|
||||
const promise = new Promise(resolve => process.nextTick(resolve));
|
||||
jest.runAllTicks();
|
||||
await promise;
|
||||
};
|
||||
const advanceTimersByTime = async (time: number) => {
|
||||
jest.advanceTimersByTime(time);
|
||||
// Advancing the time with jest doesn't run all promises, but only sync code
|
||||
await flushPromises();
|
||||
};
|
||||
|
||||
it('runs task initially', async () => {
|
||||
const task = jest.fn(async () => {});
|
||||
const cancel = runPeriodically(task, 1000);
|
||||
expect(task).toHaveBeenCalledTimes(1);
|
||||
cancel();
|
||||
});
|
||||
|
||||
it('runs at requested interval', async () => {
|
||||
const task = jest.fn(async () => {});
|
||||
const cancel = runPeriodically(task, 1000);
|
||||
await flushPromises();
|
||||
await advanceTimersByTime(1000);
|
||||
await advanceTimersByTime(1000);
|
||||
expect(task).toHaveBeenCalledTimes(3);
|
||||
cancel();
|
||||
});
|
||||
|
||||
it('stops after being canceled', async () => {
|
||||
const task = jest.fn(async () => {});
|
||||
const cancel = runPeriodically(task, 1000);
|
||||
await flushPromises();
|
||||
cancel();
|
||||
await advanceTimersByTime(1000);
|
||||
await advanceTimersByTime(1000);
|
||||
expect(task).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('continues running after failures', async () => {
|
||||
const task = jest.fn(async () => {
|
||||
throw new Error();
|
||||
});
|
||||
const cancel = runPeriodically(task, 1000);
|
||||
await flushPromises();
|
||||
await advanceTimersByTime(1000);
|
||||
await advanceTimersByTime(1000);
|
||||
expect(task).toHaveBeenCalledTimes(3);
|
||||
cancel();
|
||||
});
|
||||
|
||||
it('waits till a long running task is completed', async () => {
|
||||
const task = jest.fn(
|
||||
() => new Promise(resolve => setTimeout(resolve, 10000)),
|
||||
);
|
||||
const cancel = runPeriodically(task, 1000);
|
||||
await flushPromises();
|
||||
await advanceTimersByTime(1000);
|
||||
expect(task).toHaveBeenCalledTimes(1);
|
||||
await advanceTimersByTime(9000);
|
||||
await advanceTimersByTime(1000);
|
||||
expect(task).toHaveBeenCalledTimes(2);
|
||||
cancel();
|
||||
});
|
||||
});
|
||||
@@ -1,54 +0,0 @@
|
||||
/*
|
||||
* Copyright 2020 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Runs a function repeatedly, with a fixed wait between invocations.
|
||||
*
|
||||
* Supports async functions, and silently ignores exceptions and rejections.
|
||||
*
|
||||
* @param fn - The function to run. May return a Promise.
|
||||
* @param delayMs - The delay between a completed function invocation and the
|
||||
* next.
|
||||
* @returns A function that, when called, stops the invocation loop.
|
||||
*/
|
||||
export function runPeriodically(fn: () => any, delayMs: number): () => void {
|
||||
let cancel: () => void;
|
||||
let cancelled = false;
|
||||
const cancellationPromise = new Promise<void>(resolve => {
|
||||
cancel = () => {
|
||||
resolve();
|
||||
cancelled = true;
|
||||
};
|
||||
});
|
||||
|
||||
const startRefresh = async () => {
|
||||
while (!cancelled) {
|
||||
try {
|
||||
await fn();
|
||||
} catch {
|
||||
// ignore intentionally
|
||||
}
|
||||
|
||||
await Promise.race([
|
||||
new Promise(resolve => setTimeout(resolve, delayMs)),
|
||||
cancellationPromise,
|
||||
]);
|
||||
}
|
||||
};
|
||||
startRefresh();
|
||||
|
||||
return cancel!;
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { TaskRunner } from '@backstage/backend-tasks';
|
||||
import {
|
||||
DocumentCollatorFactory,
|
||||
DocumentDecoratorFactory,
|
||||
@@ -35,10 +36,10 @@ export type IndexBuilderOptions = {
|
||||
*/
|
||||
export interface RegisterCollatorParameters {
|
||||
/**
|
||||
* The default interval (in seconds) that the provided collator will be called (can be overridden in config).
|
||||
* The schedule for which the provided collator will be called, commonly the result of
|
||||
* {@link @backstage/backend-tasks#PluginTaskScheduler.createScheduledTaskRunner}
|
||||
*/
|
||||
defaultRefreshIntervalSeconds: number;
|
||||
|
||||
schedule: TaskRunner;
|
||||
/**
|
||||
* The class responsible for returning the document collator of the given type.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user