feat: allow streaming catalog entities

Signed-off-by: Hellgren Heikki <heikki.hellgren@op.fi>
This commit is contained in:
Hellgren Heikki
2025-06-05 15:41:59 +03:00
parent b1bd24a0b2
commit 0e9ec444b7
15 changed files with 222 additions and 4 deletions
+21
View File
@@ -0,0 +1,21 @@
---
'@backstage/catalog-client': minor
'@backstage/plugin-catalog-react': minor
'@backstage/plugin-catalog-node': minor
---
Introduced new `streamEntities` async generator method for the catalog.
Catalog API and Catalog Service now includes a `streamEntities` method that allows for streaming entities from the catalog.
This method is designed to handle large datasets efficiently by processing entities in a stream rather than loading them
all into memory at once. This is useful when you need to fetch a large number of entities but do not want to use pagination
or fetch all entities at once.
Example usage:
```ts
const stream = catalogClient.streamEntities({}, { token });
for await (const entity of stream) {
// Handle entity
}
```
@@ -22,6 +22,7 @@ import { GetLocationsResponse } from '@backstage/catalog-client';
import { Location as Location_2 } from '@backstage/catalog-client';
import { QueryEntitiesRequest } from '@backstage/catalog-client';
import { QueryEntitiesResponse } from '@backstage/catalog-client';
import { StreamEntitiesRequest } from '@backstage/catalog-client';
import { ValidateEntityResponse } from '@backstage/catalog-client';
// @public
@@ -70,6 +71,8 @@ export class InMemoryCatalogClient implements CatalogApi {
// (undocumented)
removeLocationById(_id: string): Promise<void>;
// (undocumented)
streamEntities(request?: StreamEntitiesRequest): AsyncIterable<Entity>;
// (undocumented)
validateEntity(
_entity: Entity,
_locationRef: string,
+14
View File
@@ -88,6 +88,10 @@ export interface CatalogApi {
id: string,
options?: CatalogRequestOptions,
): Promise<void>;
streamEntities(
request?: StreamEntitiesRequest,
options?: CatalogRequestOptions,
): AsyncIterable<Entity>;
validateEntity(
entity: Entity,
locationRef: string,
@@ -170,6 +174,10 @@ export class CatalogClient implements CatalogApi {
id: string,
options?: CatalogRequestOptions,
): Promise<void>;
streamEntities(
request?: StreamEntitiesRequest,
options?: CatalogRequestOptions,
): AsyncIterable<Entity>;
validateEntity(
entity: Entity,
locationRef: string,
@@ -317,6 +325,12 @@ export type QueryEntitiesResponse = {
};
};
// @public
export type StreamEntitiesRequest = Omit<
QueryEntitiesRequest,
'limit' | 'offset'
>;
// @public
export type ValidateEntityResponse =
| {
@@ -540,6 +540,71 @@ describe('CatalogClient', () => {
});
});
describe('streamEntities', () => {
const defaultResponse: QueryEntitiesResponse = {
items: [
{
apiVersion: '1',
kind: 'Component',
metadata: {
name: 'Test2',
namespace: 'test1',
},
},
{
apiVersion: '1',
kind: 'Component',
metadata: {
name: 'Test1',
namespace: 'test1',
},
},
],
pageInfo: {
nextCursor: 'next',
prevCursor: 'prev',
},
totalItems: 10,
};
beforeEach(() => {
server.use(
rest.get(`${mockBaseUrl}/entities/by-query`, (req, res, ctx) => {
const cursor = req.url.searchParams.get('cursor');
if (cursor === 'next') {
return res(ctx.json({ items: [], pageInfo: {}, totalItems: 0 }));
}
return res(ctx.json(defaultResponse));
}),
);
});
it('should stream entities', async () => {
const stream = client.streamEntities({}, { token });
const results: Entity[] = [];
for await (const entity of stream) {
results.push(entity);
}
expect(results).toEqual(defaultResponse.items);
});
it('should handle errors', async () => {
const mockedEndpoint = jest
.fn()
.mockImplementation((_req, res, ctx) => res(ctx.status(401)));
server.use(rest.get(`${mockBaseUrl}/entities/by-query`, mockedEndpoint));
const stream = client.streamEntities({}, { token });
await expect(async () => {
const results: Entity[] = [];
for await (const entity of stream) {
results.push(entity);
}
}).rejects.toThrow(/Request failed with 401 Unauthorized/);
});
});
describe('getEntityByRef', () => {
const existingEntity: Entity = {
apiVersion: 'v1',
@@ -40,6 +40,7 @@ import {
Location,
QueryEntitiesRequest,
QueryEntitiesResponse,
StreamEntitiesRequest,
ValidateEntityResponse,
} from './types/api';
import { isQueryEntitiesInitialRequest, splitRefsIntoChunks } from './utils';
@@ -49,6 +50,9 @@ import type {
AnalyzeLocationResponse,
} from '@backstage/plugin-catalog-common';
// Number of entities to return in a single streamEntities request
const STREAM_ENTITIES_LIMIT = 500;
/**
* A frontend and backend compatible client for communicating with the Backstage
* software catalog.
@@ -456,6 +460,29 @@ export class CatalogClient implements CatalogApi {
return response.json() as Promise<AnalyzeLocationResponse>;
}
/**
* {@inheritdoc CatalogApi.streamEntities}
*/
async *streamEntities(
request?: StreamEntitiesRequest,
options?: CatalogRequestOptions,
): AsyncIterable<Entity> {
let cursor: string | undefined = undefined;
do {
const res = await this.queryEntities(
cursor
? { ...request, cursor, limit: STREAM_ENTITIES_LIMIT }
: { ...request, limit: STREAM_ENTITIES_LIMIT },
options,
);
for (const entity of res.items) {
yield entity;
}
cursor = res.pageInfo.nextCursor;
} while (cursor);
}
//
// Private methods
//
@@ -123,6 +123,16 @@ describe('InMemoryCatalogClient', () => {
});
});
it('streamEntities', async () => {
const client = new InMemoryCatalogClient({ entities });
const stream = client.streamEntities();
const results: Entity[] = [];
for await (const entity of stream) {
results.push(entity);
}
expect(results).toEqual(entities);
});
it('getEntityAncestors', async () => {
const client = new InMemoryCatalogClient({ entities });
await expect(
@@ -32,6 +32,7 @@ import {
Location,
QueryEntitiesRequest,
QueryEntitiesResponse,
StreamEntitiesRequest,
ValidateEntityResponse,
} from '@backstage/catalog-client';
import {
@@ -279,6 +280,22 @@ export class InMemoryCatalogClient implements CatalogApi {
throw new NotImplementedError('Method not implemented.');
}
async *streamEntities(
request?: StreamEntitiesRequest,
): AsyncIterable<Entity> {
let cursor: string | undefined = undefined;
do {
const res = await this.queryEntities(
cursor ? { ...request, cursor } : request,
);
for (const entity of res.items) {
yield entity;
}
cursor = res.pageInfo.nextCursor;
} while (cursor);
}
#createEntityRefMap() {
return new Map(this.#entities.map(e => [stringifyEntityRef(e), e]));
}
+24
View File
@@ -465,6 +465,16 @@ export type QueryEntitiesResponse = {
};
};
/**
* Stream entities request for {@link CatalogClient.streamEntities}.
*
* @public
*/
export type StreamEntitiesRequest = Omit<
QueryEntitiesRequest,
'limit' | 'offset'
>;
/**
* A client for interacting with the Backstage software catalog through its API.
*
@@ -692,4 +702,18 @@ export interface CatalogApi {
location: AnalyzeLocationRequest,
options?: CatalogRequestOptions,
): Promise<AnalyzeLocationResponse>;
/**
* Asynchronously streams entities from the catalog. Uses `queryEntities`
* to fetch entities in batches, and yields them one by one.
*
* @public
*
* @param request - Request parameters
* @param options - Additional options
*/
streamEntities(
request?: StreamEntitiesRequest,
options?: CatalogRequestOptions,
): AsyncIterable<Entity>;
}
@@ -38,5 +38,6 @@ export type {
QueryEntitiesInitialRequest,
QueryEntitiesRequest,
QueryEntitiesResponse,
StreamEntitiesRequest,
} from './api';
export { ENTITY_STATUS_CATALOG_PROCESSING_TYPE } from './status';
@@ -27,6 +27,7 @@ import { QueryEntitiesRequest } from '@backstage/catalog-client';
import { QueryEntitiesResponse } from '@backstage/catalog-client';
import { ServiceFactory } from '@backstage/backend-plugin-api';
import { ServiceMock } from '@backstage/backend-test-utils';
import { StreamEntitiesRequest } from '@backstage/catalog-client';
import { ValidateEntityResponse } from '@backstage/catalog-client';
// @public
@@ -107,6 +108,11 @@ export interface CatalogServiceMock extends CatalogService, CatalogApi {
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
): Promise<void>;
// (undocumented)
streamEntities(
request?: StreamEntitiesRequest,
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
): AsyncIterable<Entity>;
// (undocumented)
validateEntity(
entity: Entity,
locationRef: string,
+6
View File
@@ -27,6 +27,7 @@ import { LocationSpec as LocationSpec_2 } from '@backstage/plugin-catalog-common
import { QueryEntitiesRequest } from '@backstage/catalog-client';
import { QueryEntitiesResponse } from '@backstage/catalog-client';
import { ServiceRef } from '@backstage/backend-plugin-api';
import { StreamEntitiesRequest } from '@backstage/catalog-client';
import { ValidateEntityResponse } from '@backstage/catalog-client';
// @public (undocumented)
@@ -195,6 +196,11 @@ export interface CatalogService {
options: CatalogServiceRequestOptions,
): Promise<void>;
// (undocumented)
streamEntities(
request: StreamEntitiesRequest | undefined,
options: CatalogServiceRequestOptions,
): AsyncIterable<Entity>;
// (undocumented)
validateEntity(
entity: Entity,
locationRef: string,
+19 -3
View File
@@ -15,11 +15,11 @@
*/
import {
AuthService,
BackstageCredentials,
coreServices,
createServiceFactory,
createServiceRef,
coreServices,
BackstageCredentials,
AuthService,
} from '@backstage/backend-plugin-api';
import {
AddLocationRequest,
@@ -39,6 +39,7 @@ import {
Location,
QueryEntitiesRequest,
QueryEntitiesResponse,
StreamEntitiesRequest,
ValidateEntityResponse,
} from '@backstage/catalog-client';
import { CompoundEntityRef, Entity } from '@backstage/catalog-model';
@@ -141,6 +142,11 @@ export interface CatalogService {
location: AnalyzeLocationRequest,
options: CatalogServiceRequestOptions,
): Promise<AnalyzeLocationResponse>;
streamEntities(
request: StreamEntitiesRequest | undefined,
options: CatalogServiceRequestOptions,
): AsyncIterable<Entity>;
}
class DefaultCatalogService implements CatalogService {
@@ -320,6 +326,16 @@ class DefaultCatalogService implements CatalogService {
);
}
async *streamEntities(
request: StreamEntitiesRequest | undefined,
options: CatalogServiceRequestOptions,
): AsyncIterable<Entity> {
yield* this.#catalogApi.streamEntities(
request,
await this.#getOptions(options),
);
}
async #getOptions(
options: CatalogServiceRequestOptions,
): Promise<CatalogRequestOptions> {
@@ -15,9 +15,9 @@
*/
import {
createServiceFactory,
ServiceFactory,
ServiceRef,
createServiceFactory,
} from '@backstage/backend-plugin-api';
import { InMemoryCatalogClient } from '@backstage/catalog-client/testUtils';
import { Entity } from '@backstage/catalog-model';
@@ -103,5 +103,6 @@ export namespace catalogServiceMock {
getLocationByEntity: jest.fn(),
validateEntity: jest.fn(),
analyzeLocation: jest.fn(),
streamEntities: jest.fn(),
}));
}
@@ -31,6 +31,7 @@ import {
Location,
QueryEntitiesRequest,
QueryEntitiesResponse,
StreamEntitiesRequest,
ValidateEntityResponse,
} from '@backstage/catalog-client';
import { CompoundEntityRef, Entity } from '@backstage/catalog-model';
@@ -135,4 +136,9 @@ export interface CatalogServiceMock extends CatalogService, CatalogApi {
location: AnalyzeLocationRequest,
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
): Promise<AnalyzeLocationResponse>;
streamEntities(
request?: StreamEntitiesRequest,
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
): AsyncIterable<Entity>;
}
@@ -102,5 +102,6 @@ export namespace catalogApiMock {
getLocationByEntity: jest.fn(),
validateEntity: jest.fn(),
analyzeLocation: jest.fn(),
streamEntities: jest.fn(),
}));
}