move catalog collator to use cursor paged streaming

Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
Fredrik Adelöw
2024-12-18 10:02:41 +01:00
parent 189e96695f
commit 1e09b06ee9
3 changed files with 39 additions and 64 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-search-backend-module-catalog': patch
---
Internal refactor to use cursor based pagination
@@ -14,17 +14,13 @@
* limitations under the License.
*/
import { registerMswTestHooks } from '@backstage/backend-test-utils';
import { mockServices } from '@backstage/backend-test-utils';
import { Entity } from '@backstage/catalog-model';
import { ConfigReader } from '@backstage/config';
import { catalogServiceMock } from '@backstage/plugin-catalog-node/testUtils';
import { TestPipeline } from '@backstage/plugin-search-backend-node';
import { rest } from 'msw';
import { setupServer } from 'msw/node';
import { Readable } from 'stream';
import { DefaultCatalogCollatorFactory } from './DefaultCatalogCollatorFactory';
import { DiscoveryService } from '@backstage/backend-plugin-api';
const server = setupServer();
const expectedEntities: Entity[] = [
{
@@ -57,44 +53,21 @@ const expectedEntities: Entity[] = [
];
describe('DefaultCatalogCollatorFactory', () => {
const config = new ConfigReader({});
const mockDiscoveryApi: jest.Mocked<DiscoveryService> = {
getBaseUrl: jest.fn().mockResolvedValue('http://localhost:7007'),
getExternalBaseUrl: jest.fn(),
};
const options = {
discovery: mockDiscoveryApi,
};
registerMswTestHooks(server);
beforeEach(() => {
server.use(
rest.get('http://localhost:7007/entities', (req, res, ctx) => {
if (req.url.searchParams.has('filter')) {
const filter = req.url.searchParams.get('filter');
if (filter === 'kind=Foo,kind=Bar') {
// When filtering on the 'Foo,Bar' kinds we simply return no items, to simulate a filter
return res(ctx.json([]));
}
throw new Error('Unexpected filter parameter');
}
// Imitate offset/limit pagination.
const offset = parseInt(req.url.searchParams.get('offset') || '0', 10);
const limit = parseInt(req.url.searchParams.get('limit') || '500', 10);
return res(ctx.json(expectedEntities.slice(offset, limit + offset)));
}),
);
const config = mockServices.rootConfig();
const discovery = mockServices.discovery.mock({
getBaseUrl: async () => 'http://localhost:7007',
});
const catalog = catalogServiceMock({ entities: expectedEntities });
describe('getCollator', () => {
let factory: DefaultCatalogCollatorFactory;
let collator: Readable;
beforeEach(async () => {
factory = DefaultCatalogCollatorFactory.fromConfig(config, options);
factory = DefaultCatalogCollatorFactory.fromConfig(config, {
discovery,
catalogClient: catalog,
});
collator = await factory.getCollator();
});
@@ -105,7 +78,6 @@ describe('DefaultCatalogCollatorFactory', () => {
it('fetches from the configured catalog service', async () => {
const pipeline = TestPipeline.fromCollator(collator);
const { documents } = await pipeline.execute();
expect(mockDiscoveryApi.getBaseUrl).toHaveBeenCalledWith('catalog');
expect(documents).toHaveLength(expectedEntities.length);
});
@@ -145,7 +117,8 @@ describe('DefaultCatalogCollatorFactory', () => {
it('maps a returned entity to an expected CatalogEntityDocument with custom transformer', async () => {
const customFactory = DefaultCatalogCollatorFactory.fromConfig(config, {
...options,
discovery,
catalogClient: catalog,
entityTransformer: entity => ({
title: `custom-title-${
entity.metadata.title ?? entity.metadata.name
@@ -201,7 +174,8 @@ describe('DefaultCatalogCollatorFactory', () => {
it('maps a returned entity with a custom locationTemplate', async () => {
// Provide an alternate location template.
factory = DefaultCatalogCollatorFactory.fromConfig(new ConfigReader({}), {
discovery: mockDiscoveryApi,
discovery: discovery,
catalogClient: catalog,
locationTemplate: '/software/:name',
});
collator = await factory.getCollator();
@@ -216,7 +190,8 @@ describe('DefaultCatalogCollatorFactory', () => {
it('allows filtering of the retrieved catalog entities', async () => {
// Provide a custom filter.
factory = DefaultCatalogCollatorFactory.fromConfig(new ConfigReader({}), {
discovery: mockDiscoveryApi,
discovery: discovery,
catalogClient: catalog,
filter: {
kind: ['Foo', 'Bar'],
},
@@ -232,7 +207,8 @@ describe('DefaultCatalogCollatorFactory', () => {
it('paginates through catalog entities using batchSize', async () => {
factory = DefaultCatalogCollatorFactory.fromConfig(config, {
...options,
discovery,
catalogClient: catalog,
batchSize: 1,
});
collator = await factory.getCollator();
@@ -18,6 +18,7 @@ import {
TokenManager,
createLegacyAuthAdapters,
} from '@backstage/backend-common';
import { AuthService, DiscoveryService } from '@backstage/backend-plugin-api';
import {
CatalogApi,
CatalogClient,
@@ -33,7 +34,6 @@ import { Readable } from 'stream';
import { CatalogCollatorEntityTransformer } from './CatalogCollatorEntityTransformer';
import { readCollatorConfigOptions } from './config';
import { defaultCatalogCollatorEntityTransformer } from './defaultCatalogCollatorEntityTransformer';
import { AuthService, DiscoveryService } from '@backstage/backend-plugin-api';
/**
* @public
@@ -55,6 +55,7 @@ export type DefaultCatalogCollatorFactoryOptions = {
* @deprecated Use the config key `search.collators.catalog.batchSize` instead.
*/
batchSize?: number;
// TODO(freben): Change to required CatalogService instead when fully migrated to the new backend system.
catalogClient?: CatalogApi;
/**
* Allows you to customize how entities are shaped into documents.
@@ -137,32 +138,25 @@ export class DefaultCatalogCollatorFactory implements DocumentCollatorFactory {
private async *execute(): AsyncGenerator<CatalogEntityDocument> {
let entitiesRetrieved = 0;
let moreEntitiesToGet = true;
let cursor: string | undefined = undefined;
// Offset/limit pagination is used on the Catalog Client in order to
// limit (and allow some control over) memory used by the search backend
// at index-time.
while (moreEntitiesToGet) {
do {
const { token } = await this.auth.getPluginRequestToken({
onBehalfOf: await this.auth.getOwnServiceCredentials(),
targetPluginId: 'catalog',
});
const entities = (
await this.catalogClient.getEntities(
{
filter: this.filter,
limit: this.batchSize,
offset: entitiesRetrieved,
},
{ token },
)
).items;
const response = await this.catalogClient.queryEntities(
{
filter: this.filter,
limit: this.batchSize,
...(cursor ? { cursor } : {}),
},
{ token },
);
cursor = response.pageInfo.nextCursor;
entitiesRetrieved += response.items.length;
// Control looping through entity batches.
moreEntitiesToGet = entities.length === this.batchSize;
entitiesRetrieved += entities.length;
for (const entity of entities) {
for (const entity of response.items) {
yield {
...this.entityTransformer(entity),
authorization: {
@@ -175,7 +169,7 @@ export class DefaultCatalogCollatorFactory implements DocumentCollatorFactory {
}),
};
}
}
} while (cursor);
}
private applyArgsToFormat(