Initial implementation of a generic ndjson collator.
Signed-off-by: Eric Peterson <ericpeterson@spotify.com>
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-node': patch
|
||||
---
|
||||
|
||||
Introducing a `NewlineDelimitedJsonCollatorFactory`, which can be used to create search indices from newline delimited JSON files stored in external storage readable via a configured `UrlReader` instance.
|
||||
|
||||
This is useful if you have an independent process periodically generating `*.ndjson` files consisting of `IndexableDocument` objects and want to be able to generate a fresh index based on the latest version of such a file.
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-node': patch
|
||||
---
|
||||
|
||||
Fixed a bug that prevented `TestPipeline.withSubject` from identifying valid `Readable` subjects that were technically transform streams.
|
||||
@@ -5,6 +5,7 @@
|
||||
```ts
|
||||
/// <reference types="node" />
|
||||
|
||||
import { Config } from '@backstage/config';
|
||||
import { DocumentCollatorFactory } from '@backstage/plugin-search-common';
|
||||
import { DocumentDecoratorFactory } from '@backstage/plugin-search-common';
|
||||
import { DocumentTypeInfo } from '@backstage/plugin-search-common';
|
||||
@@ -12,6 +13,7 @@ import { IndexableDocument } from '@backstage/plugin-search-common';
|
||||
import { IndexableResultSet } from '@backstage/plugin-search-common';
|
||||
import { Logger } from 'winston';
|
||||
import { default as lunr_2 } from 'lunr';
|
||||
import { Permission } from '@backstage/plugin-permission-common';
|
||||
import { QueryTranslator } from '@backstage/plugin-search-common';
|
||||
import { Readable } from 'stream';
|
||||
import { SearchEngine } from '@backstage/plugin-search-common';
|
||||
@@ -19,6 +21,7 @@ import { SearchQuery } from '@backstage/plugin-search-common';
|
||||
import { TaskFunction } from '@backstage/backend-tasks';
|
||||
import { TaskRunner } from '@backstage/backend-tasks';
|
||||
import { Transform } from 'stream';
|
||||
import { UrlReader } from '@backstage/backend-common';
|
||||
import { Writable } from 'stream';
|
||||
|
||||
// @beta
|
||||
@@ -112,6 +115,31 @@ export class LunrSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
initialize(): Promise<void>;
|
||||
}
|
||||
|
||||
// @beta
|
||||
export class NewlineDelimitedJsonCollatorFactory
|
||||
implements DocumentCollatorFactory
|
||||
{
|
||||
static fromConfig(
|
||||
_config: Config,
|
||||
options: NewlineDelimitedJsonCollatorFactoryOptions,
|
||||
): NewlineDelimitedJsonCollatorFactory;
|
||||
// (undocumented)
|
||||
getCollator(): Promise<Readable>;
|
||||
// (undocumented)
|
||||
readonly type: string;
|
||||
// (undocumented)
|
||||
readonly visibilityPermission: Permission | undefined;
|
||||
}
|
||||
|
||||
// @beta (undocumented)
|
||||
export type NewlineDelimitedJsonCollatorFactoryOptions = {
|
||||
type: string;
|
||||
searchPattern: string;
|
||||
reader: UrlReader;
|
||||
logger: Logger;
|
||||
visibilityPermission?: Permission;
|
||||
};
|
||||
|
||||
// @beta
|
||||
export interface RegisterCollatorParameters {
|
||||
factory: DocumentCollatorFactory;
|
||||
|
||||
@@ -23,19 +23,24 @@
|
||||
"clean": "backstage-cli package clean"
|
||||
},
|
||||
"dependencies": {
|
||||
"@backstage/backend-common": "^0.13.6-next.0",
|
||||
"@backstage/backend-tasks": "^0.3.2-next.0",
|
||||
"@backstage/config": "^1.0.1",
|
||||
"@backstage/errors": "^1.0.0",
|
||||
"@backstage/plugin-permission-common": "^0.6.1",
|
||||
"@backstage/plugin-search-common": "^0.3.4",
|
||||
"@types/lunr": "^2.3.3",
|
||||
"lodash": "^4.17.21",
|
||||
"lunr": "^2.3.9",
|
||||
"ndjson": "^2.0.0",
|
||||
"node-abort-controller": "^3.0.1",
|
||||
"uuid": "^8.3.2",
|
||||
"winston": "^3.2.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@backstage/backend-common": "^0.13.6-next.0",
|
||||
"@backstage/cli": "^0.17.2-next.0"
|
||||
"@backstage/cli": "^0.17.2-next.0",
|
||||
"@types/ndjson": "^2.0.1"
|
||||
},
|
||||
"files": [
|
||||
"dist"
|
||||
|
||||
+157
@@ -0,0 +1,157 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
getVoidLogger,
|
||||
ReadUrlResponse,
|
||||
UrlReader,
|
||||
UrlReaders,
|
||||
} from '@backstage/backend-common';
|
||||
import { ConfigReader } from '@backstage/config';
|
||||
import { Readable } from 'stream';
|
||||
import { NewlineDelimitedJsonCollatorFactory } from './NewlineDelimitedJsonCollatorFactory';
|
||||
import { TestPipeline } from '../test-utils';
|
||||
|
||||
describe('DefaultCatalogCollatorFactory', () => {
|
||||
const config = new ConfigReader({});
|
||||
const logger = getVoidLogger();
|
||||
|
||||
it('has expected type', () => {
|
||||
const factory = NewlineDelimitedJsonCollatorFactory.fromConfig(config, {
|
||||
type: 'expected-type',
|
||||
searchPattern: 'test://folder/prefix-*',
|
||||
logger,
|
||||
reader: UrlReaders.default({ logger, config }),
|
||||
});
|
||||
expect(factory.type).toBe('expected-type');
|
||||
});
|
||||
|
||||
describe('getCollator', () => {
|
||||
let readable: Readable;
|
||||
let reader: jest.Mocked<
|
||||
UrlReader & { readUrl: jest.Mock<Promise<ReadUrlResponse>> }
|
||||
>;
|
||||
let factory: NewlineDelimitedJsonCollatorFactory;
|
||||
|
||||
beforeEach(async () => {
|
||||
jest.clearAllMocks();
|
||||
|
||||
readable = new Readable();
|
||||
readable._read = () => {};
|
||||
reader = {
|
||||
search: jest.fn(),
|
||||
read: jest.fn(),
|
||||
readTree: jest.fn(),
|
||||
readUrl: jest.fn(),
|
||||
};
|
||||
factory = NewlineDelimitedJsonCollatorFactory.fromConfig(config, {
|
||||
type: 'expected-type',
|
||||
searchPattern: 'test://folder/prefix-*',
|
||||
logger,
|
||||
reader: UrlReaders.create({
|
||||
logger,
|
||||
config,
|
||||
factories: [() => [{ predicate: () => true, reader }]],
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
it('throws if url reader throws an error during search', async () => {
|
||||
reader.search.mockRejectedValue(new Error('Expected error'));
|
||||
|
||||
await expect(() => factory.getCollator()).rejects.toThrowError(
|
||||
'Expected error',
|
||||
);
|
||||
});
|
||||
|
||||
it('throws if no matching files are found', async () => {
|
||||
reader.search.mockResolvedValue({ files: [], etag: '' });
|
||||
|
||||
await expect(() => factory.getCollator()).rejects.toThrowError(
|
||||
'Could not find an .ndjson file matching',
|
||||
);
|
||||
});
|
||||
|
||||
it('throws if matching file is not .ndjson', async () => {
|
||||
reader.search.mockResolvedValue({
|
||||
files: [{ url: 'test://folder/prefix-1.avro', content: jest.fn() }],
|
||||
etag: '',
|
||||
});
|
||||
reader.readUrl.mockResolvedValue({
|
||||
buffer: jest.fn(),
|
||||
stream: jest.fn().mockReturnValue(readable),
|
||||
});
|
||||
|
||||
await expect(() => factory.getCollator()).rejects.toThrowError(
|
||||
'Could not find an .ndjson file matching',
|
||||
);
|
||||
});
|
||||
|
||||
it('gets stream using latest matched url', async () => {
|
||||
reader.search.mockResolvedValue({
|
||||
files: [
|
||||
{ url: 'test://folder/prefix-1.ndjson', content: jest.fn() },
|
||||
{ url: 'test://folder/prefix-2.ndjson', content: jest.fn() },
|
||||
],
|
||||
etag: '',
|
||||
});
|
||||
reader.readUrl.mockResolvedValue({
|
||||
buffer: jest.fn(),
|
||||
stream: jest.fn().mockReturnValue(readable),
|
||||
});
|
||||
|
||||
await factory.getCollator();
|
||||
|
||||
expect(reader.search).toHaveBeenCalledWith(
|
||||
'test://folder/prefix-*',
|
||||
undefined,
|
||||
);
|
||||
expect(reader.readUrl).toHaveBeenCalledWith(
|
||||
'test://folder/prefix-2.ndjson',
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
|
||||
it('transforms newline delimited json into readable stream of documents', async () => {
|
||||
reader.search.mockResolvedValue({
|
||||
files: [{ url: 'test://folder/prefix-1.ndjson', content: jest.fn() }],
|
||||
etag: '',
|
||||
});
|
||||
reader.readUrl.mockResolvedValue({
|
||||
buffer: jest.fn(),
|
||||
stream: jest
|
||||
.fn()
|
||||
.mockReturnValue(
|
||||
Readable.from(
|
||||
'{"title": "Title 1", "location": "/title-1", "text": "text 1"}\n{"title": "Title 2", "location": "/title-2", "text": "text 2"}',
|
||||
),
|
||||
),
|
||||
});
|
||||
|
||||
const collator = await factory.getCollator();
|
||||
const pipeline = TestPipeline.withSubject(collator);
|
||||
const { documents } = await pipeline.execute();
|
||||
|
||||
expect(documents).toHaveLength(2);
|
||||
expect(documents[0].title).toBe('Title 1');
|
||||
expect(documents[0].location).toBe('/title-1');
|
||||
expect(documents[0].text).toBe('text 1');
|
||||
expect(documents[1].title).toBe('Title 2');
|
||||
expect(documents[1].location).toBe('/title-2');
|
||||
expect(documents[1].text).toBe('text 2');
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,147 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { UrlReader } from '@backstage/backend-common';
|
||||
import { Config } from '@backstage/config';
|
||||
import { Permission } from '@backstage/plugin-permission-common';
|
||||
import { DocumentCollatorFactory } from '@backstage/plugin-search-common';
|
||||
import { parse as parseNdjson } from 'ndjson';
|
||||
import { Readable } from 'stream';
|
||||
import { Logger } from 'winston';
|
||||
|
||||
/**
|
||||
* @beta
|
||||
*/
|
||||
export type NewlineDelimitedJsonCollatorFactoryOptions = {
|
||||
type: string;
|
||||
searchPattern: string;
|
||||
reader: UrlReader;
|
||||
logger: Logger;
|
||||
visibilityPermission?: Permission;
|
||||
};
|
||||
|
||||
/**
|
||||
* Factory class producing a collator that can be used to index documents
|
||||
* sourced from the latest newline delimited JSON file matching a given search
|
||||
* pattern. "Latest" is determined by the name of the file (last alphabetically
|
||||
* is considered latest).
|
||||
*
|
||||
* @remarks
|
||||
* The reader provided must implement the `search()` method as well as the
|
||||
* `readUrl` method whose response includes the `stream()` method. Naturally,
|
||||
* the reader must also be configured to understand the given search pattern.
|
||||
*
|
||||
* @example
|
||||
* Here's an example configuration using Google Cloud Storage, which would
|
||||
* return the latest file under the `bucket` GCS bucket with files like
|
||||
* `xyz-2021.ndjson` or `xyz-2022.ndjson`.
|
||||
* ```ts
|
||||
* indexBuilder.addCollator({
|
||||
* schedule,
|
||||
* factory: NewlineDelimitedJsonCollatorFactory.fromConfig(env.config, {
|
||||
* type: 'techdocs',
|
||||
* searchPattern: 'https://storage.cloud.google.com/bucket/xyz-*',
|
||||
* reader: env.reader,
|
||||
* logger: env.logger,
|
||||
* })
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* @beta
|
||||
*/
|
||||
export class NewlineDelimitedJsonCollatorFactory
|
||||
implements DocumentCollatorFactory
|
||||
{
|
||||
/** {@inheritDoc @backstage/plugin-search-common#DocumentCollatorFactory."type"} */
|
||||
readonly type: string;
|
||||
|
||||
/** {@inheritDoc @backstage/plugin-search-common#DocumentCollatorFactory.visibilityPermission} */
|
||||
public readonly visibilityPermission: Permission | undefined;
|
||||
|
||||
private constructor(
|
||||
type: string,
|
||||
private readonly searchPattern: string,
|
||||
private readonly reader: UrlReader,
|
||||
private readonly logger: Logger,
|
||||
visibilityPermission: Permission | undefined,
|
||||
) {
|
||||
this.type = type;
|
||||
this.visibilityPermission = visibilityPermission;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a NewlineDelimitedJsonCollatorFactory instance from configuration
|
||||
* and a set of options.
|
||||
*/
|
||||
static fromConfig(
|
||||
_config: Config,
|
||||
options: NewlineDelimitedJsonCollatorFactoryOptions,
|
||||
): NewlineDelimitedJsonCollatorFactory {
|
||||
return new NewlineDelimitedJsonCollatorFactory(
|
||||
options.type,
|
||||
options.searchPattern,
|
||||
options.reader,
|
||||
options.logger,
|
||||
options.visibilityPermission,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the "latest" URL for the given search pattern (e.g. the one at the
|
||||
* end of the list, sorted alphabetically).
|
||||
*/
|
||||
private async lastUrl(): Promise<string | undefined> {
|
||||
try {
|
||||
// Search for files matching the given pattern, then sort/reverse. The
|
||||
// first item in the list will be the "latest" file.
|
||||
this.logger.info(
|
||||
`Attempting to find latest .ndjson matching ${this.searchPattern}`,
|
||||
);
|
||||
const { files } = await this.reader.search(this.searchPattern);
|
||||
const candidates = files
|
||||
.filter(file => file.url.endsWith('.ndjson'))
|
||||
.sort((a, b) => a.url.localeCompare(b.url))
|
||||
.reverse();
|
||||
|
||||
return candidates[0]?.url;
|
||||
} catch (e) {
|
||||
this.logger.error(`Could not search for ${this.searchPattern}`, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc @backstage/plugin-search-common#DocumentCollatorFactory.getCollator} */
|
||||
async getCollator(): Promise<Readable> {
|
||||
// Search for files matching the given pattern.
|
||||
const lastUrl = await this.lastUrl();
|
||||
|
||||
// Abort if no such file could be found.
|
||||
if (!lastUrl) {
|
||||
const noMatchingFile = `Could not find an .ndjson file matching ${this.searchPattern}`;
|
||||
this.logger.error(noMatchingFile);
|
||||
throw new Error(noMatchingFile);
|
||||
} else {
|
||||
this.logger.info(`Using latest .ndjson file ${lastUrl}`);
|
||||
}
|
||||
|
||||
// Use the UrlReader to try and stream the file.
|
||||
const readerResponse = await this.reader.readUrl!(lastUrl);
|
||||
const stream = readerResponse.stream!();
|
||||
|
||||
// Use ndjson's parser to turn the raw file into an object-mode stream.
|
||||
return stream.pipe(parseNdjson());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export type { NewlineDelimitedJsonCollatorFactoryOptions } from './NewlineDelimitedJsonCollatorFactory';
|
||||
|
||||
export { NewlineDelimitedJsonCollatorFactory } from './NewlineDelimitedJsonCollatorFactory';
|
||||
@@ -22,6 +22,7 @@
|
||||
|
||||
export { IndexBuilder } from './IndexBuilder';
|
||||
export { Scheduler } from './Scheduler';
|
||||
export * from './collators';
|
||||
export { LunrSearchEngine } from './engines';
|
||||
export type {
|
||||
ConcreteLunrQuery,
|
||||
|
||||
@@ -66,14 +66,14 @@ export class TestPipeline {
|
||||
return new TestPipeline({ decorator: subject });
|
||||
}
|
||||
|
||||
if (subject instanceof Readable) {
|
||||
return new TestPipeline({ collator: subject });
|
||||
}
|
||||
|
||||
if (subject instanceof Writable) {
|
||||
return new TestPipeline({ indexer: subject });
|
||||
}
|
||||
|
||||
if (subject.readable || subject instanceof Readable) {
|
||||
return new TestPipeline({ collator: subject });
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
'Unknown test subject: are you passing a readable, writable, or transform stream?',
|
||||
);
|
||||
|
||||
@@ -6224,6 +6224,14 @@
|
||||
resolved "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz#31b7ca6407128a3d2bbc27fe2d21b345397f6197"
|
||||
integrity sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==
|
||||
|
||||
"@types/ndjson@^2.0.1":
|
||||
version "2.0.1"
|
||||
resolved "https://registry.npmjs.org/@types/ndjson/-/ndjson-2.0.1.tgz#0279bc20949bfb861d69ac3de5292775b169a2d0"
|
||||
integrity sha512-xSRLa/CtPjEo0plSQj+nMKjVBkYh5MeMwOXa1y//jFELdmy9AmVQgWKWQgZ+/XrNlAYxXtmKR8OHaizPgEpUEw==
|
||||
dependencies:
|
||||
"@types/node" "*"
|
||||
"@types/through" "*"
|
||||
|
||||
"@types/node-fetch@^2.5.0", "@types/node-fetch@^2.5.12", "@types/node-fetch@^2.5.7":
|
||||
version "2.6.1"
|
||||
resolved "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.1.tgz#8f127c50481db65886800ef496f20bbf15518975"
|
||||
@@ -18367,6 +18375,17 @@ natural-compare@^1.4.0:
|
||||
resolved "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz#4abebfeed7541f2c27acfb29bdbbd15c8d5ba4f7"
|
||||
integrity sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc=
|
||||
|
||||
ndjson@^2.0.0:
|
||||
version "2.0.0"
|
||||
resolved "https://registry.npmjs.org/ndjson/-/ndjson-2.0.0.tgz#320ac86f6fe53f5681897349b86ac6f43bfa3a19"
|
||||
integrity sha512-nGl7LRGrzugTtaFcJMhLbpzJM6XdivmbkdlaGcrk/LXg2KL/YBC6z1g70xh0/al+oFuVFP8N8kiWRucmeEH/qQ==
|
||||
dependencies:
|
||||
json-stringify-safe "^5.0.1"
|
||||
minimist "^1.2.5"
|
||||
readable-stream "^3.6.0"
|
||||
split2 "^3.0.0"
|
||||
through2 "^4.0.0"
|
||||
|
||||
negotiator@0.6.3, negotiator@^0.6.2, negotiator@^0.6.3:
|
||||
version "0.6.3"
|
||||
resolved "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz#58e323a72fedc0d6f9cd4d31fe49f51479590ccd"
|
||||
|
||||
Reference in New Issue
Block a user