Initial implementation of a generic ndjson collator.

Signed-off-by: Eric Peterson <ericpeterson@spotify.com>
This commit is contained in:
Eric Peterson
2022-04-29 18:00:55 +02:00
parent 0d760d25ad
commit 3bb25a9acc
10 changed files with 393 additions and 5 deletions
+7
View File
@@ -0,0 +1,7 @@
---
'@backstage/plugin-search-backend-node': patch
---
Introducing a `NewlineDelimitedJsonCollatorFactory`, which can be used to create search indices from newline delimited JSON files stored in external storage readable via a configured `UrlReader` instance.
This is useful if you have an independent process periodically generating `*.ndjson` files consisting of `IndexableDocument` objects and want to be able to generate a fresh index based on the latest version of such a file.
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-search-backend-node': patch
---
Fixed a bug that prevented `TestPipeline.withSubject` from identifying valid `Readable` subjects that were technically transform streams.
+28
View File
@@ -5,6 +5,7 @@
```ts
/// <reference types="node" />
import { Config } from '@backstage/config';
import { DocumentCollatorFactory } from '@backstage/plugin-search-common';
import { DocumentDecoratorFactory } from '@backstage/plugin-search-common';
import { DocumentTypeInfo } from '@backstage/plugin-search-common';
@@ -12,6 +13,7 @@ import { IndexableDocument } from '@backstage/plugin-search-common';
import { IndexableResultSet } from '@backstage/plugin-search-common';
import { Logger } from 'winston';
import { default as lunr_2 } from 'lunr';
import { Permission } from '@backstage/plugin-permission-common';
import { QueryTranslator } from '@backstage/plugin-search-common';
import { Readable } from 'stream';
import { SearchEngine } from '@backstage/plugin-search-common';
@@ -19,6 +21,7 @@ import { SearchQuery } from '@backstage/plugin-search-common';
import { TaskFunction } from '@backstage/backend-tasks';
import { TaskRunner } from '@backstage/backend-tasks';
import { Transform } from 'stream';
import { UrlReader } from '@backstage/backend-common';
import { Writable } from 'stream';
// @beta
@@ -112,6 +115,31 @@ export class LunrSearchEngineIndexer extends BatchSearchEngineIndexer {
initialize(): Promise<void>;
}
// @beta
export class NewlineDelimitedJsonCollatorFactory
implements DocumentCollatorFactory
{
static fromConfig(
_config: Config,
options: NewlineDelimitedJsonCollatorFactoryOptions,
): NewlineDelimitedJsonCollatorFactory;
// (undocumented)
getCollator(): Promise<Readable>;
// (undocumented)
readonly type: string;
// (undocumented)
readonly visibilityPermission: Permission | undefined;
}
// @beta (undocumented)
export type NewlineDelimitedJsonCollatorFactoryOptions = {
type: string;
searchPattern: string;
reader: UrlReader;
logger: Logger;
visibilityPermission?: Permission;
};
// @beta
export interface RegisterCollatorParameters {
factory: DocumentCollatorFactory;
+6 -1
View File
@@ -23,19 +23,24 @@
"clean": "backstage-cli package clean"
},
"dependencies": {
"@backstage/backend-common": "^0.13.6-next.0",
"@backstage/backend-tasks": "^0.3.2-next.0",
"@backstage/config": "^1.0.1",
"@backstage/errors": "^1.0.0",
"@backstage/plugin-permission-common": "^0.6.1",
"@backstage/plugin-search-common": "^0.3.4",
"@types/lunr": "^2.3.3",
"lodash": "^4.17.21",
"lunr": "^2.3.9",
"ndjson": "^2.0.0",
"node-abort-controller": "^3.0.1",
"uuid": "^8.3.2",
"winston": "^3.2.1"
},
"devDependencies": {
"@backstage/backend-common": "^0.13.6-next.0",
"@backstage/cli": "^0.17.2-next.0"
"@backstage/cli": "^0.17.2-next.0",
"@types/ndjson": "^2.0.1"
},
"files": [
"dist"
@@ -0,0 +1,157 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
getVoidLogger,
ReadUrlResponse,
UrlReader,
UrlReaders,
} from '@backstage/backend-common';
import { ConfigReader } from '@backstage/config';
import { Readable } from 'stream';
import { NewlineDelimitedJsonCollatorFactory } from './NewlineDelimitedJsonCollatorFactory';
import { TestPipeline } from '../test-utils';
describe('DefaultCatalogCollatorFactory', () => {
const config = new ConfigReader({});
const logger = getVoidLogger();
it('has expected type', () => {
const factory = NewlineDelimitedJsonCollatorFactory.fromConfig(config, {
type: 'expected-type',
searchPattern: 'test://folder/prefix-*',
logger,
reader: UrlReaders.default({ logger, config }),
});
expect(factory.type).toBe('expected-type');
});
describe('getCollator', () => {
let readable: Readable;
let reader: jest.Mocked<
UrlReader & { readUrl: jest.Mock<Promise<ReadUrlResponse>> }
>;
let factory: NewlineDelimitedJsonCollatorFactory;
beforeEach(async () => {
jest.clearAllMocks();
readable = new Readable();
readable._read = () => {};
reader = {
search: jest.fn(),
read: jest.fn(),
readTree: jest.fn(),
readUrl: jest.fn(),
};
factory = NewlineDelimitedJsonCollatorFactory.fromConfig(config, {
type: 'expected-type',
searchPattern: 'test://folder/prefix-*',
logger,
reader: UrlReaders.create({
logger,
config,
factories: [() => [{ predicate: () => true, reader }]],
}),
});
});
it('throws if url reader throws an error during search', async () => {
reader.search.mockRejectedValue(new Error('Expected error'));
await expect(() => factory.getCollator()).rejects.toThrowError(
'Expected error',
);
});
it('throws if no matching files are found', async () => {
reader.search.mockResolvedValue({ files: [], etag: '' });
await expect(() => factory.getCollator()).rejects.toThrowError(
'Could not find an .ndjson file matching',
);
});
it('throws if matching file is not .ndjson', async () => {
reader.search.mockResolvedValue({
files: [{ url: 'test://folder/prefix-1.avro', content: jest.fn() }],
etag: '',
});
reader.readUrl.mockResolvedValue({
buffer: jest.fn(),
stream: jest.fn().mockReturnValue(readable),
});
await expect(() => factory.getCollator()).rejects.toThrowError(
'Could not find an .ndjson file matching',
);
});
it('gets stream using latest matched url', async () => {
reader.search.mockResolvedValue({
files: [
{ url: 'test://folder/prefix-1.ndjson', content: jest.fn() },
{ url: 'test://folder/prefix-2.ndjson', content: jest.fn() },
],
etag: '',
});
reader.readUrl.mockResolvedValue({
buffer: jest.fn(),
stream: jest.fn().mockReturnValue(readable),
});
await factory.getCollator();
expect(reader.search).toHaveBeenCalledWith(
'test://folder/prefix-*',
undefined,
);
expect(reader.readUrl).toHaveBeenCalledWith(
'test://folder/prefix-2.ndjson',
undefined,
);
});
it('transforms newline delimited json into readable stream of documents', async () => {
reader.search.mockResolvedValue({
files: [{ url: 'test://folder/prefix-1.ndjson', content: jest.fn() }],
etag: '',
});
reader.readUrl.mockResolvedValue({
buffer: jest.fn(),
stream: jest
.fn()
.mockReturnValue(
Readable.from(
'{"title": "Title 1", "location": "/title-1", "text": "text 1"}\n{"title": "Title 2", "location": "/title-2", "text": "text 2"}',
),
),
});
const collator = await factory.getCollator();
const pipeline = TestPipeline.withSubject(collator);
const { documents } = await pipeline.execute();
expect(documents).toHaveLength(2);
expect(documents[0].title).toBe('Title 1');
expect(documents[0].location).toBe('/title-1');
expect(documents[0].text).toBe('text 1');
expect(documents[1].title).toBe('Title 2');
expect(documents[1].location).toBe('/title-2');
expect(documents[1].text).toBe('text 2');
});
});
});
@@ -0,0 +1,147 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { UrlReader } from '@backstage/backend-common';
import { Config } from '@backstage/config';
import { Permission } from '@backstage/plugin-permission-common';
import { DocumentCollatorFactory } from '@backstage/plugin-search-common';
import { parse as parseNdjson } from 'ndjson';
import { Readable } from 'stream';
import { Logger } from 'winston';
/**
* @beta
*/
export type NewlineDelimitedJsonCollatorFactoryOptions = {
type: string;
searchPattern: string;
reader: UrlReader;
logger: Logger;
visibilityPermission?: Permission;
};
/**
* Factory class producing a collator that can be used to index documents
* sourced from the latest newline delimited JSON file matching a given search
* pattern. "Latest" is determined by the name of the file (last alphabetically
* is considered latest).
*
* @remarks
* The reader provided must implement the `search()` method as well as the
* `readUrl` method whose response includes the `stream()` method. Naturally,
* the reader must also be configured to understand the given search pattern.
*
* @example
* Here's an example configuration using Google Cloud Storage, which would
* return the latest file under the `bucket` GCS bucket with files like
* `xyz-2021.ndjson` or `xyz-2022.ndjson`.
* ```ts
* indexBuilder.addCollator({
* schedule,
* factory: NewlineDelimitedJsonCollatorFactory.fromConfig(env.config, {
* type: 'techdocs',
* searchPattern: 'https://storage.cloud.google.com/bucket/xyz-*',
* reader: env.reader,
* logger: env.logger,
* })
* });
* ```
*
* @beta
*/
export class NewlineDelimitedJsonCollatorFactory
implements DocumentCollatorFactory
{
/** {@inheritDoc @backstage/plugin-search-common#DocumentCollatorFactory."type"} */
readonly type: string;
/** {@inheritDoc @backstage/plugin-search-common#DocumentCollatorFactory.visibilityPermission} */
public readonly visibilityPermission: Permission | undefined;
private constructor(
type: string,
private readonly searchPattern: string,
private readonly reader: UrlReader,
private readonly logger: Logger,
visibilityPermission: Permission | undefined,
) {
this.type = type;
this.visibilityPermission = visibilityPermission;
}
/**
* Returns a NewlineDelimitedJsonCollatorFactory instance from configuration
* and a set of options.
*/
static fromConfig(
_config: Config,
options: NewlineDelimitedJsonCollatorFactoryOptions,
): NewlineDelimitedJsonCollatorFactory {
return new NewlineDelimitedJsonCollatorFactory(
options.type,
options.searchPattern,
options.reader,
options.logger,
options.visibilityPermission,
);
}
/**
* Returns the "latest" URL for the given search pattern (e.g. the one at the
* end of the list, sorted alphabetically).
*/
private async lastUrl(): Promise<string | undefined> {
try {
// Search for files matching the given pattern, then sort/reverse. The
// first item in the list will be the "latest" file.
this.logger.info(
`Attempting to find latest .ndjson matching ${this.searchPattern}`,
);
const { files } = await this.reader.search(this.searchPattern);
const candidates = files
.filter(file => file.url.endsWith('.ndjson'))
.sort((a, b) => a.url.localeCompare(b.url))
.reverse();
return candidates[0]?.url;
} catch (e) {
this.logger.error(`Could not search for ${this.searchPattern}`, e);
throw e;
}
}
/** {@inheritDoc @backstage/plugin-search-common#DocumentCollatorFactory.getCollator} */
async getCollator(): Promise<Readable> {
// Search for files matching the given pattern.
const lastUrl = await this.lastUrl();
// Abort if no such file could be found.
if (!lastUrl) {
const noMatchingFile = `Could not find an .ndjson file matching ${this.searchPattern}`;
this.logger.error(noMatchingFile);
throw new Error(noMatchingFile);
} else {
this.logger.info(`Using latest .ndjson file ${lastUrl}`);
}
// Use the UrlReader to try and stream the file.
const readerResponse = await this.reader.readUrl!(lastUrl);
const stream = readerResponse.stream!();
// Use ndjson's parser to turn the raw file into an object-mode stream.
return stream.pipe(parseNdjson());
}
}
@@ -0,0 +1,19 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export type { NewlineDelimitedJsonCollatorFactoryOptions } from './NewlineDelimitedJsonCollatorFactory';
export { NewlineDelimitedJsonCollatorFactory } from './NewlineDelimitedJsonCollatorFactory';
+1
View File
@@ -22,6 +22,7 @@
export { IndexBuilder } from './IndexBuilder';
export { Scheduler } from './Scheduler';
export * from './collators';
export { LunrSearchEngine } from './engines';
export type {
ConcreteLunrQuery,
@@ -66,14 +66,14 @@ export class TestPipeline {
return new TestPipeline({ decorator: subject });
}
if (subject instanceof Readable) {
return new TestPipeline({ collator: subject });
}
if (subject instanceof Writable) {
return new TestPipeline({ indexer: subject });
}
if (subject.readable || subject instanceof Readable) {
return new TestPipeline({ collator: subject });
}
throw new Error(
'Unknown test subject: are you passing a readable, writable, or transform stream?',
);
+19
View File
@@ -6224,6 +6224,14 @@
resolved "https://registry.npmjs.org/@types/ms/-/ms-0.7.31.tgz#31b7ca6407128a3d2bbc27fe2d21b345397f6197"
integrity sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==
"@types/ndjson@^2.0.1":
version "2.0.1"
resolved "https://registry.npmjs.org/@types/ndjson/-/ndjson-2.0.1.tgz#0279bc20949bfb861d69ac3de5292775b169a2d0"
integrity sha512-xSRLa/CtPjEo0plSQj+nMKjVBkYh5MeMwOXa1y//jFELdmy9AmVQgWKWQgZ+/XrNlAYxXtmKR8OHaizPgEpUEw==
dependencies:
"@types/node" "*"
"@types/through" "*"
"@types/node-fetch@^2.5.0", "@types/node-fetch@^2.5.12", "@types/node-fetch@^2.5.7":
version "2.6.1"
resolved "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.1.tgz#8f127c50481db65886800ef496f20bbf15518975"
@@ -18367,6 +18375,17 @@ natural-compare@^1.4.0:
resolved "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz#4abebfeed7541f2c27acfb29bdbbd15c8d5ba4f7"
integrity sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc=
ndjson@^2.0.0:
version "2.0.0"
resolved "https://registry.npmjs.org/ndjson/-/ndjson-2.0.0.tgz#320ac86f6fe53f5681897349b86ac6f43bfa3a19"
integrity sha512-nGl7LRGrzugTtaFcJMhLbpzJM6XdivmbkdlaGcrk/LXg2KL/YBC6z1g70xh0/al+oFuVFP8N8kiWRucmeEH/qQ==
dependencies:
json-stringify-safe "^5.0.1"
minimist "^1.2.5"
readable-stream "^3.6.0"
split2 "^3.0.0"
through2 "^4.0.0"
negotiator@0.6.3, negotiator@^0.6.2, negotiator@^0.6.3:
version "0.6.3"
resolved "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz#58e323a72fedc0d6f9cd4d31fe49f51479590ccd"