move catalog collator to use cursor paged streaming
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-module-catalog': patch
|
||||
---
|
||||
|
||||
Internal refactor to use cursor based pagination
|
||||
+18
-42
@@ -14,17 +14,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { registerMswTestHooks } from '@backstage/backend-test-utils';
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
import { Entity } from '@backstage/catalog-model';
|
||||
import { ConfigReader } from '@backstage/config';
|
||||
import { catalogServiceMock } from '@backstage/plugin-catalog-node/testUtils';
|
||||
import { TestPipeline } from '@backstage/plugin-search-backend-node';
|
||||
import { rest } from 'msw';
|
||||
import { setupServer } from 'msw/node';
|
||||
import { Readable } from 'stream';
|
||||
import { DefaultCatalogCollatorFactory } from './DefaultCatalogCollatorFactory';
|
||||
import { DiscoveryService } from '@backstage/backend-plugin-api';
|
||||
|
||||
const server = setupServer();
|
||||
|
||||
const expectedEntities: Entity[] = [
|
||||
{
|
||||
@@ -57,44 +53,21 @@ const expectedEntities: Entity[] = [
|
||||
];
|
||||
|
||||
describe('DefaultCatalogCollatorFactory', () => {
|
||||
const config = new ConfigReader({});
|
||||
const mockDiscoveryApi: jest.Mocked<DiscoveryService> = {
|
||||
getBaseUrl: jest.fn().mockResolvedValue('http://localhost:7007'),
|
||||
getExternalBaseUrl: jest.fn(),
|
||||
};
|
||||
|
||||
const options = {
|
||||
discovery: mockDiscoveryApi,
|
||||
};
|
||||
|
||||
registerMswTestHooks(server);
|
||||
|
||||
beforeEach(() => {
|
||||
server.use(
|
||||
rest.get('http://localhost:7007/entities', (req, res, ctx) => {
|
||||
if (req.url.searchParams.has('filter')) {
|
||||
const filter = req.url.searchParams.get('filter');
|
||||
if (filter === 'kind=Foo,kind=Bar') {
|
||||
// When filtering on the 'Foo,Bar' kinds we simply return no items, to simulate a filter
|
||||
return res(ctx.json([]));
|
||||
}
|
||||
throw new Error('Unexpected filter parameter');
|
||||
}
|
||||
|
||||
// Imitate offset/limit pagination.
|
||||
const offset = parseInt(req.url.searchParams.get('offset') || '0', 10);
|
||||
const limit = parseInt(req.url.searchParams.get('limit') || '500', 10);
|
||||
return res(ctx.json(expectedEntities.slice(offset, limit + offset)));
|
||||
}),
|
||||
);
|
||||
const config = mockServices.rootConfig();
|
||||
const discovery = mockServices.discovery.mock({
|
||||
getBaseUrl: async () => 'http://localhost:7007',
|
||||
});
|
||||
const catalog = catalogServiceMock({ entities: expectedEntities });
|
||||
|
||||
describe('getCollator', () => {
|
||||
let factory: DefaultCatalogCollatorFactory;
|
||||
let collator: Readable;
|
||||
|
||||
beforeEach(async () => {
|
||||
factory = DefaultCatalogCollatorFactory.fromConfig(config, options);
|
||||
factory = DefaultCatalogCollatorFactory.fromConfig(config, {
|
||||
discovery,
|
||||
catalogClient: catalog,
|
||||
});
|
||||
collator = await factory.getCollator();
|
||||
});
|
||||
|
||||
@@ -105,7 +78,6 @@ describe('DefaultCatalogCollatorFactory', () => {
|
||||
it('fetches from the configured catalog service', async () => {
|
||||
const pipeline = TestPipeline.fromCollator(collator);
|
||||
const { documents } = await pipeline.execute();
|
||||
expect(mockDiscoveryApi.getBaseUrl).toHaveBeenCalledWith('catalog');
|
||||
expect(documents).toHaveLength(expectedEntities.length);
|
||||
});
|
||||
|
||||
@@ -145,7 +117,8 @@ describe('DefaultCatalogCollatorFactory', () => {
|
||||
|
||||
it('maps a returned entity to an expected CatalogEntityDocument with custom transformer', async () => {
|
||||
const customFactory = DefaultCatalogCollatorFactory.fromConfig(config, {
|
||||
...options,
|
||||
discovery,
|
||||
catalogClient: catalog,
|
||||
entityTransformer: entity => ({
|
||||
title: `custom-title-${
|
||||
entity.metadata.title ?? entity.metadata.name
|
||||
@@ -201,7 +174,8 @@ describe('DefaultCatalogCollatorFactory', () => {
|
||||
it('maps a returned entity with a custom locationTemplate', async () => {
|
||||
// Provide an alternate location template.
|
||||
factory = DefaultCatalogCollatorFactory.fromConfig(new ConfigReader({}), {
|
||||
discovery: mockDiscoveryApi,
|
||||
discovery: discovery,
|
||||
catalogClient: catalog,
|
||||
locationTemplate: '/software/:name',
|
||||
});
|
||||
collator = await factory.getCollator();
|
||||
@@ -216,7 +190,8 @@ describe('DefaultCatalogCollatorFactory', () => {
|
||||
it('allows filtering of the retrieved catalog entities', async () => {
|
||||
// Provide a custom filter.
|
||||
factory = DefaultCatalogCollatorFactory.fromConfig(new ConfigReader({}), {
|
||||
discovery: mockDiscoveryApi,
|
||||
discovery: discovery,
|
||||
catalogClient: catalog,
|
||||
filter: {
|
||||
kind: ['Foo', 'Bar'],
|
||||
},
|
||||
@@ -232,7 +207,8 @@ describe('DefaultCatalogCollatorFactory', () => {
|
||||
|
||||
it('paginates through catalog entities using batchSize', async () => {
|
||||
factory = DefaultCatalogCollatorFactory.fromConfig(config, {
|
||||
...options,
|
||||
discovery,
|
||||
catalogClient: catalog,
|
||||
batchSize: 1,
|
||||
});
|
||||
collator = await factory.getCollator();
|
||||
|
||||
+16
-22
@@ -18,6 +18,7 @@ import {
|
||||
TokenManager,
|
||||
createLegacyAuthAdapters,
|
||||
} from '@backstage/backend-common';
|
||||
import { AuthService, DiscoveryService } from '@backstage/backend-plugin-api';
|
||||
import {
|
||||
CatalogApi,
|
||||
CatalogClient,
|
||||
@@ -33,7 +34,6 @@ import { Readable } from 'stream';
|
||||
import { CatalogCollatorEntityTransformer } from './CatalogCollatorEntityTransformer';
|
||||
import { readCollatorConfigOptions } from './config';
|
||||
import { defaultCatalogCollatorEntityTransformer } from './defaultCatalogCollatorEntityTransformer';
|
||||
import { AuthService, DiscoveryService } from '@backstage/backend-plugin-api';
|
||||
|
||||
/**
|
||||
* @public
|
||||
@@ -55,6 +55,7 @@ export type DefaultCatalogCollatorFactoryOptions = {
|
||||
* @deprecated Use the config key `search.collators.catalog.batchSize` instead.
|
||||
*/
|
||||
batchSize?: number;
|
||||
// TODO(freben): Change to required CatalogService instead when fully migrated to the new backend system.
|
||||
catalogClient?: CatalogApi;
|
||||
/**
|
||||
* Allows you to customize how entities are shaped into documents.
|
||||
@@ -137,32 +138,25 @@ export class DefaultCatalogCollatorFactory implements DocumentCollatorFactory {
|
||||
|
||||
private async *execute(): AsyncGenerator<CatalogEntityDocument> {
|
||||
let entitiesRetrieved = 0;
|
||||
let moreEntitiesToGet = true;
|
||||
let cursor: string | undefined = undefined;
|
||||
|
||||
// Offset/limit pagination is used on the Catalog Client in order to
|
||||
// limit (and allow some control over) memory used by the search backend
|
||||
// at index-time.
|
||||
while (moreEntitiesToGet) {
|
||||
do {
|
||||
const { token } = await this.auth.getPluginRequestToken({
|
||||
onBehalfOf: await this.auth.getOwnServiceCredentials(),
|
||||
targetPluginId: 'catalog',
|
||||
});
|
||||
const entities = (
|
||||
await this.catalogClient.getEntities(
|
||||
{
|
||||
filter: this.filter,
|
||||
limit: this.batchSize,
|
||||
offset: entitiesRetrieved,
|
||||
},
|
||||
{ token },
|
||||
)
|
||||
).items;
|
||||
const response = await this.catalogClient.queryEntities(
|
||||
{
|
||||
filter: this.filter,
|
||||
limit: this.batchSize,
|
||||
...(cursor ? { cursor } : {}),
|
||||
},
|
||||
{ token },
|
||||
);
|
||||
cursor = response.pageInfo.nextCursor;
|
||||
entitiesRetrieved += response.items.length;
|
||||
|
||||
// Control looping through entity batches.
|
||||
moreEntitiesToGet = entities.length === this.batchSize;
|
||||
entitiesRetrieved += entities.length;
|
||||
|
||||
for (const entity of entities) {
|
||||
for (const entity of response.items) {
|
||||
yield {
|
||||
...this.entityTransformer(entity),
|
||||
authorization: {
|
||||
@@ -175,7 +169,7 @@ export class DefaultCatalogCollatorFactory implements DocumentCollatorFactory {
|
||||
}),
|
||||
};
|
||||
}
|
||||
}
|
||||
} while (cursor);
|
||||
}
|
||||
|
||||
private applyArgsToFormat(
|
||||
|
||||
Reference in New Issue
Block a user