feat: allow streaming catalog entities
Signed-off-by: Hellgren Heikki <heikki.hellgren@op.fi>
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
---
|
||||
'@backstage/catalog-client': minor
|
||||
'@backstage/plugin-catalog-react': minor
|
||||
'@backstage/plugin-catalog-node': minor
|
||||
---
|
||||
|
||||
Introduced new `streamEntities` async generator method for the catalog.
|
||||
|
||||
Catalog API and Catalog Service now includes a `streamEntities` method that allows for streaming entities from the catalog.
|
||||
This method is designed to handle large datasets efficiently by processing entities in a stream rather than loading them
|
||||
all into memory at once. This is useful when you need to fetch a large number of entities but do not want to use pagination
|
||||
or fetch all entities at once.
|
||||
|
||||
Example usage:
|
||||
|
||||
```ts
|
||||
const stream = catalogClient.streamEntities({}, { token });
|
||||
for await (const entity of stream) {
|
||||
// Handle entity
|
||||
}
|
||||
```
|
||||
@@ -22,6 +22,7 @@ import { GetLocationsResponse } from '@backstage/catalog-client';
|
||||
import { Location as Location_2 } from '@backstage/catalog-client';
|
||||
import { QueryEntitiesRequest } from '@backstage/catalog-client';
|
||||
import { QueryEntitiesResponse } from '@backstage/catalog-client';
|
||||
import { StreamEntitiesRequest } from '@backstage/catalog-client';
|
||||
import { ValidateEntityResponse } from '@backstage/catalog-client';
|
||||
|
||||
// @public
|
||||
@@ -70,6 +71,8 @@ export class InMemoryCatalogClient implements CatalogApi {
|
||||
// (undocumented)
|
||||
removeLocationById(_id: string): Promise<void>;
|
||||
// (undocumented)
|
||||
streamEntities(request?: StreamEntitiesRequest): AsyncIterable<Entity>;
|
||||
// (undocumented)
|
||||
validateEntity(
|
||||
_entity: Entity,
|
||||
_locationRef: string,
|
||||
|
||||
@@ -88,6 +88,10 @@ export interface CatalogApi {
|
||||
id: string,
|
||||
options?: CatalogRequestOptions,
|
||||
): Promise<void>;
|
||||
streamEntities(
|
||||
request?: StreamEntitiesRequest,
|
||||
options?: CatalogRequestOptions,
|
||||
): AsyncIterable<Entity>;
|
||||
validateEntity(
|
||||
entity: Entity,
|
||||
locationRef: string,
|
||||
@@ -170,6 +174,10 @@ export class CatalogClient implements CatalogApi {
|
||||
id: string,
|
||||
options?: CatalogRequestOptions,
|
||||
): Promise<void>;
|
||||
streamEntities(
|
||||
request?: StreamEntitiesRequest,
|
||||
options?: CatalogRequestOptions,
|
||||
): AsyncIterable<Entity>;
|
||||
validateEntity(
|
||||
entity: Entity,
|
||||
locationRef: string,
|
||||
@@ -317,6 +325,12 @@ export type QueryEntitiesResponse = {
|
||||
};
|
||||
};
|
||||
|
||||
// @public
|
||||
export type StreamEntitiesRequest = Omit<
|
||||
QueryEntitiesRequest,
|
||||
'limit' | 'offset'
|
||||
>;
|
||||
|
||||
// @public
|
||||
export type ValidateEntityResponse =
|
||||
| {
|
||||
|
||||
@@ -540,6 +540,71 @@ describe('CatalogClient', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('streamEntities', () => {
|
||||
const defaultResponse: QueryEntitiesResponse = {
|
||||
items: [
|
||||
{
|
||||
apiVersion: '1',
|
||||
kind: 'Component',
|
||||
metadata: {
|
||||
name: 'Test2',
|
||||
namespace: 'test1',
|
||||
},
|
||||
},
|
||||
{
|
||||
apiVersion: '1',
|
||||
kind: 'Component',
|
||||
metadata: {
|
||||
name: 'Test1',
|
||||
namespace: 'test1',
|
||||
},
|
||||
},
|
||||
],
|
||||
pageInfo: {
|
||||
nextCursor: 'next',
|
||||
prevCursor: 'prev',
|
||||
},
|
||||
totalItems: 10,
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
server.use(
|
||||
rest.get(`${mockBaseUrl}/entities/by-query`, (req, res, ctx) => {
|
||||
const cursor = req.url.searchParams.get('cursor');
|
||||
if (cursor === 'next') {
|
||||
return res(ctx.json({ items: [], pageInfo: {}, totalItems: 0 }));
|
||||
}
|
||||
return res(ctx.json(defaultResponse));
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should stream entities', async () => {
|
||||
const stream = client.streamEntities({}, { token });
|
||||
const results: Entity[] = [];
|
||||
for await (const entity of stream) {
|
||||
results.push(entity);
|
||||
}
|
||||
expect(results).toEqual(defaultResponse.items);
|
||||
});
|
||||
|
||||
it('should handle errors', async () => {
|
||||
const mockedEndpoint = jest
|
||||
.fn()
|
||||
.mockImplementation((_req, res, ctx) => res(ctx.status(401)));
|
||||
|
||||
server.use(rest.get(`${mockBaseUrl}/entities/by-query`, mockedEndpoint));
|
||||
|
||||
const stream = client.streamEntities({}, { token });
|
||||
await expect(async () => {
|
||||
const results: Entity[] = [];
|
||||
for await (const entity of stream) {
|
||||
results.push(entity);
|
||||
}
|
||||
}).rejects.toThrow(/Request failed with 401 Unauthorized/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getEntityByRef', () => {
|
||||
const existingEntity: Entity = {
|
||||
apiVersion: 'v1',
|
||||
|
||||
@@ -40,6 +40,7 @@ import {
|
||||
Location,
|
||||
QueryEntitiesRequest,
|
||||
QueryEntitiesResponse,
|
||||
StreamEntitiesRequest,
|
||||
ValidateEntityResponse,
|
||||
} from './types/api';
|
||||
import { isQueryEntitiesInitialRequest, splitRefsIntoChunks } from './utils';
|
||||
@@ -49,6 +50,9 @@ import type {
|
||||
AnalyzeLocationResponse,
|
||||
} from '@backstage/plugin-catalog-common';
|
||||
|
||||
// Number of entities to return in a single streamEntities request
|
||||
const STREAM_ENTITIES_LIMIT = 500;
|
||||
|
||||
/**
|
||||
* A frontend and backend compatible client for communicating with the Backstage
|
||||
* software catalog.
|
||||
@@ -456,6 +460,29 @@ export class CatalogClient implements CatalogApi {
|
||||
return response.json() as Promise<AnalyzeLocationResponse>;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc CatalogApi.streamEntities}
|
||||
*/
|
||||
async *streamEntities(
|
||||
request?: StreamEntitiesRequest,
|
||||
options?: CatalogRequestOptions,
|
||||
): AsyncIterable<Entity> {
|
||||
let cursor: string | undefined = undefined;
|
||||
do {
|
||||
const res = await this.queryEntities(
|
||||
cursor
|
||||
? { ...request, cursor, limit: STREAM_ENTITIES_LIMIT }
|
||||
: { ...request, limit: STREAM_ENTITIES_LIMIT },
|
||||
options,
|
||||
);
|
||||
for (const entity of res.items) {
|
||||
yield entity;
|
||||
}
|
||||
|
||||
cursor = res.pageInfo.nextCursor;
|
||||
} while (cursor);
|
||||
}
|
||||
|
||||
//
|
||||
// Private methods
|
||||
//
|
||||
|
||||
@@ -123,6 +123,16 @@ describe('InMemoryCatalogClient', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('streamEntities', async () => {
|
||||
const client = new InMemoryCatalogClient({ entities });
|
||||
const stream = client.streamEntities();
|
||||
const results: Entity[] = [];
|
||||
for await (const entity of stream) {
|
||||
results.push(entity);
|
||||
}
|
||||
expect(results).toEqual(entities);
|
||||
});
|
||||
|
||||
it('getEntityAncestors', async () => {
|
||||
const client = new InMemoryCatalogClient({ entities });
|
||||
await expect(
|
||||
|
||||
@@ -32,6 +32,7 @@ import {
|
||||
Location,
|
||||
QueryEntitiesRequest,
|
||||
QueryEntitiesResponse,
|
||||
StreamEntitiesRequest,
|
||||
ValidateEntityResponse,
|
||||
} from '@backstage/catalog-client';
|
||||
import {
|
||||
@@ -279,6 +280,22 @@ export class InMemoryCatalogClient implements CatalogApi {
|
||||
throw new NotImplementedError('Method not implemented.');
|
||||
}
|
||||
|
||||
async *streamEntities(
|
||||
request?: StreamEntitiesRequest,
|
||||
): AsyncIterable<Entity> {
|
||||
let cursor: string | undefined = undefined;
|
||||
do {
|
||||
const res = await this.queryEntities(
|
||||
cursor ? { ...request, cursor } : request,
|
||||
);
|
||||
for (const entity of res.items) {
|
||||
yield entity;
|
||||
}
|
||||
|
||||
cursor = res.pageInfo.nextCursor;
|
||||
} while (cursor);
|
||||
}
|
||||
|
||||
#createEntityRefMap() {
|
||||
return new Map(this.#entities.map(e => [stringifyEntityRef(e), e]));
|
||||
}
|
||||
|
||||
@@ -465,6 +465,16 @@ export type QueryEntitiesResponse = {
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Stream entities request for {@link CatalogClient.streamEntities}.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export type StreamEntitiesRequest = Omit<
|
||||
QueryEntitiesRequest,
|
||||
'limit' | 'offset'
|
||||
>;
|
||||
|
||||
/**
|
||||
* A client for interacting with the Backstage software catalog through its API.
|
||||
*
|
||||
@@ -692,4 +702,18 @@ export interface CatalogApi {
|
||||
location: AnalyzeLocationRequest,
|
||||
options?: CatalogRequestOptions,
|
||||
): Promise<AnalyzeLocationResponse>;
|
||||
|
||||
/**
|
||||
* Asynchronously streams entities from the catalog. Uses `queryEntities`
|
||||
* to fetch entities in batches, and yields them one by one.
|
||||
*
|
||||
* @public
|
||||
*
|
||||
* @param request - Request parameters
|
||||
* @param options - Additional options
|
||||
*/
|
||||
streamEntities(
|
||||
request?: StreamEntitiesRequest,
|
||||
options?: CatalogRequestOptions,
|
||||
): AsyncIterable<Entity>;
|
||||
}
|
||||
|
||||
@@ -38,5 +38,6 @@ export type {
|
||||
QueryEntitiesInitialRequest,
|
||||
QueryEntitiesRequest,
|
||||
QueryEntitiesResponse,
|
||||
StreamEntitiesRequest,
|
||||
} from './api';
|
||||
export { ENTITY_STATUS_CATALOG_PROCESSING_TYPE } from './status';
|
||||
|
||||
@@ -27,6 +27,7 @@ import { QueryEntitiesRequest } from '@backstage/catalog-client';
|
||||
import { QueryEntitiesResponse } from '@backstage/catalog-client';
|
||||
import { ServiceFactory } from '@backstage/backend-plugin-api';
|
||||
import { ServiceMock } from '@backstage/backend-test-utils';
|
||||
import { StreamEntitiesRequest } from '@backstage/catalog-client';
|
||||
import { ValidateEntityResponse } from '@backstage/catalog-client';
|
||||
|
||||
// @public
|
||||
@@ -107,6 +108,11 @@ export interface CatalogServiceMock extends CatalogService, CatalogApi {
|
||||
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
|
||||
): Promise<void>;
|
||||
// (undocumented)
|
||||
streamEntities(
|
||||
request?: StreamEntitiesRequest,
|
||||
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
|
||||
): AsyncIterable<Entity>;
|
||||
// (undocumented)
|
||||
validateEntity(
|
||||
entity: Entity,
|
||||
locationRef: string,
|
||||
|
||||
@@ -27,6 +27,7 @@ import { LocationSpec as LocationSpec_2 } from '@backstage/plugin-catalog-common
|
||||
import { QueryEntitiesRequest } from '@backstage/catalog-client';
|
||||
import { QueryEntitiesResponse } from '@backstage/catalog-client';
|
||||
import { ServiceRef } from '@backstage/backend-plugin-api';
|
||||
import { StreamEntitiesRequest } from '@backstage/catalog-client';
|
||||
import { ValidateEntityResponse } from '@backstage/catalog-client';
|
||||
|
||||
// @public (undocumented)
|
||||
@@ -195,6 +196,11 @@ export interface CatalogService {
|
||||
options: CatalogServiceRequestOptions,
|
||||
): Promise<void>;
|
||||
// (undocumented)
|
||||
streamEntities(
|
||||
request: StreamEntitiesRequest | undefined,
|
||||
options: CatalogServiceRequestOptions,
|
||||
): AsyncIterable<Entity>;
|
||||
// (undocumented)
|
||||
validateEntity(
|
||||
entity: Entity,
|
||||
locationRef: string,
|
||||
|
||||
@@ -15,11 +15,11 @@
|
||||
*/
|
||||
|
||||
import {
|
||||
AuthService,
|
||||
BackstageCredentials,
|
||||
coreServices,
|
||||
createServiceFactory,
|
||||
createServiceRef,
|
||||
coreServices,
|
||||
BackstageCredentials,
|
||||
AuthService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import {
|
||||
AddLocationRequest,
|
||||
@@ -39,6 +39,7 @@ import {
|
||||
Location,
|
||||
QueryEntitiesRequest,
|
||||
QueryEntitiesResponse,
|
||||
StreamEntitiesRequest,
|
||||
ValidateEntityResponse,
|
||||
} from '@backstage/catalog-client';
|
||||
import { CompoundEntityRef, Entity } from '@backstage/catalog-model';
|
||||
@@ -141,6 +142,11 @@ export interface CatalogService {
|
||||
location: AnalyzeLocationRequest,
|
||||
options: CatalogServiceRequestOptions,
|
||||
): Promise<AnalyzeLocationResponse>;
|
||||
|
||||
streamEntities(
|
||||
request: StreamEntitiesRequest | undefined,
|
||||
options: CatalogServiceRequestOptions,
|
||||
): AsyncIterable<Entity>;
|
||||
}
|
||||
|
||||
class DefaultCatalogService implements CatalogService {
|
||||
@@ -320,6 +326,16 @@ class DefaultCatalogService implements CatalogService {
|
||||
);
|
||||
}
|
||||
|
||||
async *streamEntities(
|
||||
request: StreamEntitiesRequest | undefined,
|
||||
options: CatalogServiceRequestOptions,
|
||||
): AsyncIterable<Entity> {
|
||||
yield* this.#catalogApi.streamEntities(
|
||||
request,
|
||||
await this.#getOptions(options),
|
||||
);
|
||||
}
|
||||
|
||||
async #getOptions(
|
||||
options: CatalogServiceRequestOptions,
|
||||
): Promise<CatalogRequestOptions> {
|
||||
|
||||
@@ -15,9 +15,9 @@
|
||||
*/
|
||||
|
||||
import {
|
||||
createServiceFactory,
|
||||
ServiceFactory,
|
||||
ServiceRef,
|
||||
createServiceFactory,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { InMemoryCatalogClient } from '@backstage/catalog-client/testUtils';
|
||||
import { Entity } from '@backstage/catalog-model';
|
||||
@@ -103,5 +103,6 @@ export namespace catalogServiceMock {
|
||||
getLocationByEntity: jest.fn(),
|
||||
validateEntity: jest.fn(),
|
||||
analyzeLocation: jest.fn(),
|
||||
streamEntities: jest.fn(),
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import {
|
||||
Location,
|
||||
QueryEntitiesRequest,
|
||||
QueryEntitiesResponse,
|
||||
StreamEntitiesRequest,
|
||||
ValidateEntityResponse,
|
||||
} from '@backstage/catalog-client';
|
||||
import { CompoundEntityRef, Entity } from '@backstage/catalog-model';
|
||||
@@ -135,4 +136,9 @@ export interface CatalogServiceMock extends CatalogService, CatalogApi {
|
||||
location: AnalyzeLocationRequest,
|
||||
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
|
||||
): Promise<AnalyzeLocationResponse>;
|
||||
|
||||
streamEntities(
|
||||
request?: StreamEntitiesRequest,
|
||||
options?: CatalogServiceRequestOptions | CatalogRequestOptions,
|
||||
): AsyncIterable<Entity>;
|
||||
}
|
||||
|
||||
@@ -102,5 +102,6 @@ export namespace catalogApiMock {
|
||||
getLocationByEntity: jest.fn(),
|
||||
validateEntity: jest.fn(),
|
||||
analyzeLocation: jest.fn(),
|
||||
streamEntities: jest.fn(),
|
||||
}));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user