Move parallel worker utilities to @backstage/cli-node

Moves `runParallelWorkers`, `runWorkerQueueThreads`, `runWorkerThreads`,
`parseParallelismOption`, and `getEnvironmentParallelism` from the CLI
internal lib to the shared `@backstage/cli-node` package.

This is part of the ongoing effort to make CLI modules independent of
each other and the shared lib code.

Signed-off-by: Patrik Oldsberg <poldsberg@gmail.com>
This commit is contained in:
Patrik Oldsberg
2026-02-22 16:19:49 +01:00
parent 35e8d6cb34
commit 06c2015e5b
13 changed files with 142 additions and 11 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/cli-node': minor
---
Added parallel worker utilities: `runParallelWorkers`, `runWorkerQueueThreads`, `runWorkerThreads`, `parseParallelismOption`, and `getEnvironmentParallelism`. These were moved from the `@backstage/cli` internal code.
+55
View File
@@ -86,6 +86,9 @@ export interface BackstagePackageJson {
version: string;
}
// @public
export function getEnvironmentParallelism(): number;
// @public
export class GitUtils {
static listChangedFiles(ref: string): Promise<string[]>;
@@ -200,4 +203,56 @@ export class PackageRoles {
static getRoleFromPackage(pkgJson: unknown): PackageRole | undefined;
static getRoleInfo(role: string): PackageRoleInfo;
}
// @public
export type ParallelismOption = boolean | string | number | null | undefined;
// @public
export type ParallelWorkerOptions<TItem> = {
parallelismFactor?: number;
parallelismSetting?: ParallelismOption;
items: Iterable<TItem>;
worker: (item: TItem) => Promise<void>;
};
// @public
export function parseParallelismOption(parallel: ParallelismOption): number;
// @public
export function runParallelWorkers<TItem>(
options: ParallelWorkerOptions<TItem>,
): Promise<void[]>;
// @public
export function runWorkerQueueThreads<TItem, TResult, TData>(
options: WorkerQueueThreadsOptions<TItem, TResult, TData>,
): Promise<TResult[]>;
// @public
export function runWorkerThreads<TResult, TData, TMessage>(
options: WorkerThreadsOptions<TResult, TData, TMessage>,
): Promise<TResult[]>;
// @public
export type WorkerQueueThreadsOptions<TItem, TResult, TData> = {
items: Iterable<TItem>;
workerFactory: (
data: TData,
) =>
| ((item: TItem) => Promise<TResult>)
| Promise<(item: TItem) => Promise<TResult>>;
workerData?: TData;
threadCount?: number;
};
// @public
export type WorkerThreadsOptions<TResult, TData, TMessage> = {
worker: (
data: TData,
sendMessage: (message: TMessage) => void,
) => Promise<TResult>;
workerData?: TData;
threadCount?: number;
onMessage?: (message: TMessage) => void;
};
```
+1
View File
@@ -22,4 +22,5 @@
export * from './git';
export * from './monorepo';
export * from './parallel';
export * from './roles';
+28
View File
@@ -0,0 +1,28 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export type { ParallelismOption, ParallelWorkerOptions } from './parallel';
export {
parseParallelismOption,
getEnvironmentParallelism,
runParallelWorkers,
runWorkerQueueThreads,
runWorkerThreads,
} from './parallel';
export type {
WorkerQueueThreadsOptions,
WorkerThreadsOptions,
} from './parallel';
@@ -22,8 +22,21 @@ const defaultParallelism = Math.ceil(os.cpus().length / 2);
const PARALLEL_ENV_VAR = 'BACKSTAGE_CLI_BUILD_PARALLEL';
/**
* Options for configuring parallelism. Can be a boolean, string, number, null, or undefined.
* - Boolean: true uses default parallelism (half of CPUs), false uses 1
* - Number: explicit worker count
* - String: parsed as boolean or integer (e.g. "true", "4")
*
* @public
*/
export type ParallelismOption = boolean | string | number | null | undefined;
/**
* Parses a parallelism option value into a concrete worker count.
*
* @public
*/
export function parseParallelismOption(parallel: ParallelismOption): number {
if (parallel === undefined || parallel === null) {
return defaultParallelism;
@@ -51,11 +64,21 @@ export function parseParallelismOption(parallel: ParallelismOption): number {
);
}
/**
* Returns the parallelism value from the BACKSTAGE_CLI_BUILD_PARALLEL environment variable.
*
* @public
*/
export function getEnvironmentParallelism() {
return parseParallelismOption(process.env[PARALLEL_ENV_VAR]);
}
type ParallelWorkerOptions<TItem> = {
/**
* Options for runParallelWorkers.
*
* @public
*/
export type ParallelWorkerOptions<TItem> = {
/**
* Decides the number of parallel workers by multiplying
* this with the configured parallelism, which defaults to 4.
@@ -68,6 +91,11 @@ type ParallelWorkerOptions<TItem> = {
worker: (item: TItem) => Promise<void>;
};
/**
* Runs items through a worker function in parallel across multiple async workers.
*
* @public
*/
export async function runParallelWorkers<TItem>(
options: ParallelWorkerOptions<TItem>,
) {
@@ -119,6 +147,11 @@ type WorkerThreadMessage =
message: unknown;
};
/**
* Options for runWorkerQueueThreads.
*
* @public
*/
export type WorkerQueueThreadsOptions<TItem, TResult, TData> = {
/** The items to process */
items: Iterable<TItem>;
@@ -149,6 +182,8 @@ export type WorkerQueueThreadsOptions<TItem, TResult, TData> = {
/**
* Spawns one or more worker threads using the `worker_threads` module.
* Each thread processes one item at a time from the provided `options.items`.
*
* @public
*/
export async function runWorkerQueueThreads<TItem, TResult, TData>(
options: WorkerQueueThreadsOptions<TItem, TResult, TData>,
@@ -250,6 +285,11 @@ function workerQueueThread(
);
}
/**
* Options for runWorkerThreads.
*
* @public
*/
export type WorkerThreadsOptions<TResult, TData, TMessage> = {
/**
* A function that is called by each worker thread to produce a result.
@@ -277,6 +317,8 @@ export type WorkerThreadsOptions<TResult, TData, TMessage> = {
/**
* Spawns one or more worker threads using the `worker_threads` module.
*
* @public
*/
export async function runWorkerThreads<TResult, TData, TMessage>(
options: WorkerThreadsOptions<TResult, TData, TMessage>,
@@ -23,8 +23,8 @@ import {
BackstagePackage,
PackageGraph,
PackageRoles,
runParallelWorkers,
} from '@backstage/cli-node';
import { runParallelWorkers } from '../../../../lib/parallel';
import { buildFrontend } from '../../lib/buildFrontend';
import { buildBackend } from '../../lib/buildBackend';
import { createScriptOptionsParser } from '../../../../lib/optionsParser';
@@ -19,9 +19,8 @@ import fs from 'fs-extra';
import { resolve as resolvePath } from 'node:path';
import * as tar from 'tar';
import { createDistWorkspace } from './packager';
import { getEnvironmentParallelism } from '../../../lib/parallel';
import { buildPackage, Output } from './builder';
import { PackageGraph } from '@backstage/cli-node';
import { PackageGraph, getEnvironmentParallelism } from '@backstage/cli-node';
const BUNDLE_FILE = 'bundle.tar.gz';
const SKELETON_FILE = 'skeleton.tar.gz';
@@ -17,9 +17,11 @@
import fs from 'fs-extra';
import { resolve as resolvePath } from 'node:path';
import { buildBundle, getModuleFederationRemoteOptions } from './bundler';
import { getEnvironmentParallelism } from '../../../lib/parallel';
import {
BackstagePackageJson,
getEnvironmentParallelism,
} from '@backstage/cli-node';
import { loadCliConfig } from '../../config/lib/config';
import { BackstagePackageJson } from '@backstage/cli-node';
interface BuildAppOptions {
targetDir: string;
@@ -21,8 +21,7 @@ import { relative as relativePath, resolve as resolvePath } from 'node:path';
import { paths } from '../../../../lib/paths';
import { makeRollupConfigs } from './config';
import { BuildOptions, Output } from './types';
import { PackageRoles } from '@backstage/cli-node';
import { runParallelWorkers } from '../../../../lib/parallel';
import { PackageRoles, runParallelWorkers } from '@backstage/cli-node';
export function formatErrorMessage(error: any) {
let msg = '';
@@ -41,8 +41,8 @@ import {
PackageRoles,
PackageGraph,
PackageGraphNode,
runParallelWorkers,
} from '@backstage/cli-node';
import { runParallelWorkers } from '../../../../lib/parallel';
import { createTypeDistProject } from '../../../../lib/typeDistProject';
// These packages aren't safe to pack in parallel since the CLI depends on them
@@ -23,9 +23,9 @@ import {
PackageGraph,
BackstagePackageJson,
Lockfile,
runWorkerQueueThreads,
} from '@backstage/cli-node';
import { paths } from '../../../../lib/paths';
import { runWorkerQueueThreads } from '../../../../lib/parallel';
import { createScriptOptionsParser } from '../../../../lib/optionsParser';
import { SuccessCache } from '../../../../lib/cache/SuccessCache';
@@ -33,7 +33,7 @@ import {
mapDependencies,
YarnInfoInspectData,
} from '../../../../lib/versioning';
import { runParallelWorkers } from '../../../../lib/parallel';
import { runParallelWorkers } from '@backstage/cli-node';
import {
getManifestByReleaseLine,
getManifestByVersion,