Modifying large size files upload to S3
Signed-off-by: Rudra Sharans <rudra099999@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-techdocs-node': patch
|
||||
---
|
||||
|
||||
There was an issue in the uploading of large size files to the AWS S3. We have modified the logic by adding retry along with multipart uploading functionality.
|
||||
@@ -31,7 +31,7 @@ import {
|
||||
AwsCredentialProviderOptions,
|
||||
DefaultAwsCredentialsManager,
|
||||
} from '@backstage/integration-aws-node';
|
||||
import { mockClient, AwsClientStub } from 'aws-sdk-client-mock';
|
||||
import { mockClient } from 'aws-sdk-client-mock';
|
||||
import express from 'express';
|
||||
import request from 'supertest';
|
||||
import path from 'path';
|
||||
@@ -44,9 +44,10 @@ import {
|
||||
} from '@backstage/backend-test-utils';
|
||||
|
||||
const env = process.env;
|
||||
let s3Mock: AwsClientStub<S3Client>;
|
||||
let s3Mock: any;
|
||||
|
||||
const mockDir = createMockDirectory();
|
||||
// Create a new MockDirectory for each test to avoid Windows file locking issues
|
||||
let mockDir: ReturnType<typeof createMockDirectory>;
|
||||
|
||||
function getMockCredentialProvider(): Promise<AwsCredentialProvider> {
|
||||
return Promise.resolve({
|
||||
@@ -155,7 +156,7 @@ describe('AwsS3Publish', () => {
|
||||
build_timestamp: 612741599,
|
||||
};
|
||||
|
||||
const directory = getEntityRootDir(entity);
|
||||
let directory: string;
|
||||
|
||||
const files = {
|
||||
'index.html': '',
|
||||
@@ -176,7 +177,7 @@ describe('AwsS3Publish', () => {
|
||||
},
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
beforeEach(async () => {
|
||||
process.env = { ...env };
|
||||
process.env.AWS_REGION = 'us-west-2';
|
||||
|
||||
@@ -185,20 +186,26 @@ describe('AwsS3Publish', () => {
|
||||
getMockCredentialProvider(),
|
||||
);
|
||||
|
||||
// Create a fresh mockdirectory for each test to avoid windows file locking
|
||||
mockDir = createMockDirectory();
|
||||
// Calculate directory path with the new mockDir instance
|
||||
directory = getEntityRootDir(entity);
|
||||
|
||||
// Set up the test files
|
||||
mockDir.setContent({
|
||||
[directory]: files,
|
||||
});
|
||||
|
||||
s3Mock = mockClient(S3Client);
|
||||
s3Mock = mockClient(S3Client as any);
|
||||
|
||||
s3Mock.on(HeadObjectCommand).callsFake(input => {
|
||||
s3Mock.on(HeadObjectCommand).callsFake((input: any) => {
|
||||
if (!fs.pathExistsSync(mockDir.resolve(input.Key))) {
|
||||
throw new Error('File does not exist');
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
s3Mock.on(GetObjectCommand).callsFake(input => {
|
||||
s3Mock.on(GetObjectCommand).callsFake((input: any) => {
|
||||
if (fs.pathExistsSync(mockDir.resolve(input.Key))) {
|
||||
return {
|
||||
Body: Readable.from(fs.readFileSync(mockDir.resolve(input.Key))),
|
||||
@@ -208,14 +215,14 @@ describe('AwsS3Publish', () => {
|
||||
throw new Error(`The file ${input.Key} does not exist!`);
|
||||
});
|
||||
|
||||
s3Mock.on(HeadBucketCommand).callsFake(input => {
|
||||
s3Mock.on(HeadBucketCommand).callsFake((input: any) => {
|
||||
if (input.Bucket === 'errorBucket') {
|
||||
throw new Error('Bucket does not exist');
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
s3Mock.on(ListObjectsV2Command).callsFake(input => {
|
||||
s3Mock.on(ListObjectsV2Command).callsFake((input: any) => {
|
||||
if (
|
||||
input.Bucket === 'delete_stale_files_success' ||
|
||||
input.Bucket === 'delete_stale_files_error'
|
||||
@@ -227,7 +234,7 @@ describe('AwsS3Publish', () => {
|
||||
return {};
|
||||
});
|
||||
|
||||
s3Mock.on(DeleteObjectCommand).callsFake(input => {
|
||||
s3Mock.on(DeleteObjectCommand).callsFake((input: any) => {
|
||||
if (input.Bucket === 'delete_stale_files_error') {
|
||||
throw new Error('Message');
|
||||
}
|
||||
@@ -235,7 +242,7 @@ describe('AwsS3Publish', () => {
|
||||
});
|
||||
|
||||
s3Mock.on(UploadPartCommand).rejects();
|
||||
s3Mock.on(PutObjectCommand).callsFake(input => {
|
||||
s3Mock.on(PutObjectCommand).callsFake((input: any) => {
|
||||
mockDir.addContent({ [input.Key]: input.Body });
|
||||
});
|
||||
});
|
||||
@@ -320,7 +327,7 @@ describe('AwsS3Publish', () => {
|
||||
`default/component/backstage/assets/main.css`,
|
||||
]),
|
||||
});
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should publish a directory as well when legacy casing is used', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -333,7 +340,7 @@ describe('AwsS3Publish', () => {
|
||||
`default/Component/backstage/assets/main.css`,
|
||||
]),
|
||||
});
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should publish a directory when root path is specified', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -346,7 +353,7 @@ describe('AwsS3Publish', () => {
|
||||
`backstage-data/techdocs/default/component/backstage/assets/main.css`,
|
||||
]),
|
||||
});
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should publish a directory when root path is specified and legacy casing is used', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -360,7 +367,7 @@ describe('AwsS3Publish', () => {
|
||||
`backstage-data/techdocs/default/Component/backstage/assets/main.css`,
|
||||
]),
|
||||
});
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should publish a directory when sse is specified', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -373,7 +380,7 @@ describe('AwsS3Publish', () => {
|
||||
'default/component/backstage/assets/main.css',
|
||||
]),
|
||||
});
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should fail to publish a directory', async () => {
|
||||
const wrongPathToGeneratedDirectory = mockDir.resolve(
|
||||
@@ -408,7 +415,7 @@ describe('AwsS3Publish', () => {
|
||||
expect(loggerInfoSpy).toHaveBeenLastCalledWith(
|
||||
`Successfully deleted stale files for Entity ${entity.metadata.name}. Total number of files: 1`,
|
||||
);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should log error when the stale files deletion fails', async () => {
|
||||
const bucketName = 'delete_stale_files_error';
|
||||
@@ -419,7 +426,7 @@ describe('AwsS3Publish', () => {
|
||||
expect(loggerErrorSpy).toHaveBeenLastCalledWith(
|
||||
'Unable to delete file(s) from AWS S3. Error: Message',
|
||||
);
|
||||
});
|
||||
}, 30000);
|
||||
});
|
||||
|
||||
describe('hasDocsBeenGenerated', () => {
|
||||
@@ -427,7 +434,7 @@ describe('AwsS3Publish', () => {
|
||||
const publisher = await createPublisherFromConfig();
|
||||
await publisher.publish({ entity, directory });
|
||||
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return true if docs has been generated even if the legacy case is enabled', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -435,7 +442,7 @@ describe('AwsS3Publish', () => {
|
||||
});
|
||||
await publisher.publish({ entity, directory });
|
||||
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return true if docs has been generated if root path is specified', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -443,7 +450,7 @@ describe('AwsS3Publish', () => {
|
||||
});
|
||||
await publisher.publish({ entity, directory });
|
||||
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return true if docs has been generated if root path is specified and legacy casing is used', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -452,7 +459,7 @@ describe('AwsS3Publish', () => {
|
||||
});
|
||||
await publisher.publish({ entity, directory });
|
||||
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return false if docs has not been generated', async () => {
|
||||
const publisher = await createPublisherFromConfig();
|
||||
@@ -475,7 +482,7 @@ describe('AwsS3Publish', () => {
|
||||
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
|
||||
techdocsMetadata,
|
||||
);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return tech docs metadata even if the legacy case is enabled', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -485,7 +492,7 @@ describe('AwsS3Publish', () => {
|
||||
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
|
||||
techdocsMetadata,
|
||||
);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return tech docs metadata even if root path is specified', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -495,7 +502,7 @@ describe('AwsS3Publish', () => {
|
||||
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
|
||||
techdocsMetadata,
|
||||
);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return tech docs metadata if root path is specified and legacy casing is used', async () => {
|
||||
const publisher = await createPublisherFromConfig({
|
||||
@@ -506,7 +513,7 @@ describe('AwsS3Publish', () => {
|
||||
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
|
||||
techdocsMetadata,
|
||||
);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return tech docs metadata when json encoded with single quotes', async () => {
|
||||
const techdocsMetadataPath = path.join(
|
||||
@@ -528,7 +535,7 @@ describe('AwsS3Publish', () => {
|
||||
);
|
||||
|
||||
fs.writeFileSync(techdocsMetadataPath, techdocsMetadataContent);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should return an error if the techdocs_metadata.json file is not present', async () => {
|
||||
const publisher = await createPublisherFromConfig();
|
||||
@@ -549,7 +556,7 @@ describe('AwsS3Publish', () => {
|
||||
});
|
||||
|
||||
it('should return an error if the techdocs_metadata.json file cannot be read from stream', async () => {
|
||||
s3Mock.on(GetObjectCommand).callsFake(_ => {
|
||||
s3Mock.on(GetObjectCommand).callsFake((_: any) => {
|
||||
return {
|
||||
Body: new ErrorReadable('No stream!'),
|
||||
};
|
||||
@@ -582,7 +589,7 @@ describe('AwsS3Publish', () => {
|
||||
const publisher = await createPublisherFromConfig();
|
||||
await publisher.publish({ entity, directory });
|
||||
app = express().use(publisher.docsRouter());
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
it('should pass expected object path to bucket', async () => {
|
||||
// Ensures leading slash is trimmed and encoded path is decoded.
|
||||
@@ -688,7 +695,7 @@ describe('AwsS3Publish', () => {
|
||||
});
|
||||
|
||||
it('should return 404 if file cannot be read from stream', async () => {
|
||||
s3Mock.on(GetObjectCommand).callsFake(_ => {
|
||||
s3Mock.on(GetObjectCommand).callsFake((_: any) => {
|
||||
return {
|
||||
Body: new ErrorReadable('No stream!'),
|
||||
};
|
||||
|
||||
@@ -26,10 +26,12 @@ import {
|
||||
DeleteObjectCommand,
|
||||
HeadBucketCommand,
|
||||
HeadObjectCommand,
|
||||
PutObjectCommand,
|
||||
PutObjectCommandInput,
|
||||
ListObjectsV2CommandOutput,
|
||||
ListObjectsV2Command,
|
||||
S3Client,
|
||||
S3ServiceException,
|
||||
} from '@aws-sdk/client-s3';
|
||||
import { fromTemporaryCredentials } from '@aws-sdk/credential-providers';
|
||||
import { NodeHttpHandler } from '@smithy/node-http-handler';
|
||||
@@ -84,6 +86,7 @@ export class AwsS3Publish implements PublisherBase {
|
||||
private readonly logger: LoggerService;
|
||||
private readonly bucketRootPath: string;
|
||||
private readonly sse?: 'aws:kms' | 'AES256';
|
||||
private readonly maxAttempts: number;
|
||||
|
||||
constructor(options: {
|
||||
storageClient: S3Client;
|
||||
@@ -92,6 +95,7 @@ export class AwsS3Publish implements PublisherBase {
|
||||
logger: LoggerService;
|
||||
bucketRootPath: string;
|
||||
sse?: 'aws:kms' | 'AES256';
|
||||
maxAttempts: number;
|
||||
}) {
|
||||
this.storageClient = options.storageClient;
|
||||
this.bucketName = options.bucketName;
|
||||
@@ -99,6 +103,7 @@ export class AwsS3Publish implements PublisherBase {
|
||||
this.logger = options.logger;
|
||||
this.bucketRootPath = options.bucketRootPath;
|
||||
this.sse = options.sse;
|
||||
this.maxAttempts = options.maxAttempts;
|
||||
}
|
||||
|
||||
static async fromConfig(
|
||||
@@ -175,10 +180,22 @@ export class AwsS3Publish implements PublisherBase {
|
||||
...(region && { region }),
|
||||
...(endpoint && { endpoint }),
|
||||
...(forcePathStyle && { forcePathStyle }),
|
||||
...(maxAttempts && { maxAttempts }),
|
||||
// Enhanced retry configuration for better reliability
|
||||
maxAttempts: maxAttempts || 5,
|
||||
retryMode: 'adaptive',
|
||||
...(httpsProxy && {
|
||||
requestHandler: new NodeHttpHandler({
|
||||
httpsAgent: new HttpsProxyAgent({ proxy: httpsProxy }),
|
||||
// Enhanced connection setting for large file uploads
|
||||
connectionTimeout: 60000,
|
||||
socketTimeout: 120000,
|
||||
}),
|
||||
}),
|
||||
// Add default request handler with enhanced timeouts if no proxy
|
||||
...(!httpsProxy && {
|
||||
requestHandler: new NodeHttpHandler({
|
||||
connectionTimeout: 60000,
|
||||
socketTimeout: 120000,
|
||||
}),
|
||||
}),
|
||||
});
|
||||
@@ -195,6 +212,7 @@ export class AwsS3Publish implements PublisherBase {
|
||||
legacyPathCasing,
|
||||
logger,
|
||||
sse,
|
||||
maxAttempts: maxAttempts || 5,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -250,6 +268,126 @@ export class AwsS3Publish implements PublisherBase {
|
||||
|
||||
return explicitCredentials;
|
||||
}
|
||||
/**
|
||||
* Custom retry wrapper for S3 operations with detailed error handling.
|
||||
*/
|
||||
private async retryOperation<TOutput>(
|
||||
operation: () => Promise<TOutput>,
|
||||
operationName: string,
|
||||
maxAttempts: number = 3,
|
||||
): Promise<TOutput> {
|
||||
let attempts = maxAttempts;
|
||||
let LastError: S3ServiceException;
|
||||
|
||||
while (attempts > 0) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: unknown) {
|
||||
LastError = error as S3ServiceException;
|
||||
attempts--;
|
||||
|
||||
const httpStatusCode = LastError.$metadata?.httpStatusCode;
|
||||
const errorCode = LastError.name;
|
||||
|
||||
this.logger.warn(`${operationName} failed.`, {
|
||||
errorCode,
|
||||
httpStatusCode,
|
||||
attemptsRemaining: attempts,
|
||||
currentAttempt: maxAttempts - attempts,
|
||||
totalAttempts: maxAttempts,
|
||||
error: LastError.message,
|
||||
});
|
||||
// Determine if we should retry based on error type
|
||||
const shouldRetry = this.shouldRetryOperation(LastError, attempts);
|
||||
if (!shouldRetry || attempts === 0) {
|
||||
this.logger.error(
|
||||
`${operationName} failed after all retries: ${LastError.message}`,
|
||||
);
|
||||
throw LastError;
|
||||
}
|
||||
// Enhanced exponential backoff with jitter for upload operation
|
||||
let baseDelay = 1000;
|
||||
if (operationName.startsWith('Upload-')) {
|
||||
// for uploads use longer base delay due to potential multipart commplexity
|
||||
baseDelay = 2000;
|
||||
}
|
||||
const backoffDelay = Math.min(
|
||||
baseDelay * Math.pow(2, maxAttempts - attempts),
|
||||
30000,
|
||||
);
|
||||
const jitter = Math.random() * 1000;
|
||||
const totalDelay = backoffDelay + jitter;
|
||||
await new Promise(resolve => setTimeout(resolve, totalDelay));
|
||||
}
|
||||
}
|
||||
// Final attempt without retry wrapper
|
||||
return operation();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if an S3 operation should be retried based on the error details.
|
||||
*/
|
||||
private shouldRetryOperation(
|
||||
error: S3ServiceException,
|
||||
attemptsRemaining: number,
|
||||
): boolean {
|
||||
const httpStatusCode = error.$metadata?.httpStatusCode;
|
||||
const errorCode = error.name;
|
||||
// Handle invalid part errors first - these are retriable for multipart uploads
|
||||
if (errorCode === 'InvalidPart') {
|
||||
return attemptsRemaining > 0;
|
||||
}
|
||||
// Dont retry for client errors (4xx) except specific cases
|
||||
if (httpStatusCode && httpStatusCode >= 400 && httpStatusCode < 500) {
|
||||
// Retry specfic 4xx errors that might be transient
|
||||
const retriable4xxErrors = [
|
||||
'RequestTimeOut',
|
||||
'RequestTimeoutException',
|
||||
'PriorRequestNotComplete',
|
||||
'ConnectionError',
|
||||
'RequestTimeToooSkewed',
|
||||
'InvalidPart',
|
||||
'NoSuchUpload',
|
||||
];
|
||||
if (!retriable4xxErrors.includes(errorCode)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// Always retry for server errors (5xx)
|
||||
if (httpStatusCode && httpStatusCode >= 500) {
|
||||
return attemptsRemaining > 0;
|
||||
}
|
||||
// Retry specific network/connection errors and multipart upload errors
|
||||
const retriableErrors = [
|
||||
'NetworkingError',
|
||||
'TimeoutError',
|
||||
'ConnectionError',
|
||||
'ECONNRESET',
|
||||
'ENOTFOUND',
|
||||
'ECONNREFUSED',
|
||||
'ETIMEDOUT',
|
||||
'ServiceUnavailable',
|
||||
'SlowDown',
|
||||
'Throttling',
|
||||
'ThrottlingException',
|
||||
'ProvisionedThroughputExceededException',
|
||||
// Multipart upload specific errors - now handled above but kept for completeness
|
||||
'InvalidPart',
|
||||
'NoSuchUpload',
|
||||
'UploadTimeout',
|
||||
'EntityTooLarge',
|
||||
'InternalError',
|
||||
'IncompleteBody',
|
||||
'RequestTimeout',
|
||||
];
|
||||
return (
|
||||
retriableErrors.some(
|
||||
retriableError =>
|
||||
errorCode.includes(retriableError) ||
|
||||
error.message.includes(retriableError),
|
||||
) && attemptsRemaining > 0
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the defined bucket exists. Being able to connect means the configuration is good
|
||||
@@ -273,13 +411,15 @@ export class AwsS3Publish implements PublisherBase {
|
||||
'explicitly defining credentials and region in techdocs.publisher.awsS3 in app config or ' +
|
||||
'by using environment variables. Refer to https://backstage.io/docs/features/techdocs/using-cloud-storage',
|
||||
);
|
||||
this.logger.error(`from AWS client library`, error);
|
||||
this.logger.error(
|
||||
`from AWS client library`,
|
||||
error instanceof Error ? error : new Error(String(error)),
|
||||
);
|
||||
return {
|
||||
isAvailable: false,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload all the files from the generated `directory` to the S3 bucket.
|
||||
* Directory structure used in the bucket is - entityNamespace/entityKind/entityName/index.html
|
||||
@@ -293,6 +433,9 @@ export class AwsS3Publish implements PublisherBase {
|
||||
const bucketRootPath = this.bucketRootPath;
|
||||
const sse = this.sse;
|
||||
|
||||
// Track timing for performance monitoring
|
||||
const publishStartTime = Date.now();
|
||||
|
||||
// First, try to retrieve a list of all individual files currently existing
|
||||
let existingFiles: string[] = [];
|
||||
try {
|
||||
@@ -302,9 +445,20 @@ export class AwsS3Publish implements PublisherBase {
|
||||
useLegacyPathCasing,
|
||||
bucketRootPath,
|
||||
);
|
||||
existingFiles = await this.getAllObjectsFromBucket({
|
||||
prefix: remoteFolder,
|
||||
});
|
||||
const response = await this.retryOperation(
|
||||
async () => {
|
||||
const listCommand = new ListObjectsV2Command({
|
||||
Bucket: this.bucketName,
|
||||
Prefix: remoteFolder,
|
||||
});
|
||||
return this.storageClient.send(listCommand);
|
||||
},
|
||||
'ListObjects',
|
||||
this.maxAttempts,
|
||||
);
|
||||
existingFiles = (response.Contents || [])
|
||||
.map(f => f.Key || '')
|
||||
.filter(f => !!f);
|
||||
} catch (e) {
|
||||
assertError(e);
|
||||
this.logger.error(
|
||||
@@ -320,30 +474,103 @@ export class AwsS3Publish implements PublisherBase {
|
||||
// e.g. ['index.html', 'sub-page/index.html', 'assets/images/favicon.png']
|
||||
absoluteFilesToUpload = await getFileTreeRecursively(directory);
|
||||
|
||||
let uploadCounter = 0;
|
||||
|
||||
await bulkStorageOperation(
|
||||
async absoluteFilePath => {
|
||||
uploadCounter++;
|
||||
const relativeFilePath = path.relative(directory, absoluteFilePath);
|
||||
const fileStream = fs.createReadStream(absoluteFilePath);
|
||||
|
||||
const s3Key = getCloudPathForLocalPath(
|
||||
entity,
|
||||
relativeFilePath,
|
||||
useLegacyPathCasing,
|
||||
bucketRootPath,
|
||||
);
|
||||
const params: PutObjectCommandInput = {
|
||||
Bucket: this.bucketName,
|
||||
Key: getCloudPathForLocalPath(
|
||||
entity,
|
||||
relativeFilePath,
|
||||
useLegacyPathCasing,
|
||||
bucketRootPath,
|
||||
),
|
||||
Body: fileStream,
|
||||
Key: s3Key,
|
||||
Body: absoluteFilePath,
|
||||
...(sse && { ServerSideEncryption: sse }),
|
||||
};
|
||||
|
||||
objects.push(params.Key!);
|
||||
// Get file stats before upload
|
||||
const stats = await fs.stat(absoluteFilePath);
|
||||
const fileSizeInBytes = stats.size;
|
||||
|
||||
const upload = new Upload({
|
||||
client: this.storageClient,
|
||||
params,
|
||||
});
|
||||
return upload.done();
|
||||
// Use retry wrapper for uploads with enhanced error handling
|
||||
try {
|
||||
const result = await this.retryOperation(
|
||||
async () => {
|
||||
const fiveMB = 5 * 1024 * 1024;
|
||||
// For files smaller than 5MB, use simple PutObject to avoid multipart complexity
|
||||
if (fileSizeInBytes < fiveMB) {
|
||||
const fileContent = await fs.readFile(absoluteFilePath);
|
||||
const putParams = { ...params, Body: fileContent };
|
||||
return this.storageClient.send(
|
||||
new PutObjectCommand(putParams),
|
||||
);
|
||||
}
|
||||
// For files 5MB and larger, use multipart upload with enhanced configuration
|
||||
const calaculatedPartSize = Math.max(
|
||||
fiveMB,
|
||||
Math.ceil(fileSizeInBytes / 10000),
|
||||
);
|
||||
const upload = new Upload({
|
||||
client: this.storageClient,
|
||||
params,
|
||||
// Configure miltipart upload option for better reliability
|
||||
partSize: calaculatedPartSize,
|
||||
queueSize: 3,
|
||||
leavePartsOnError: false,
|
||||
});
|
||||
return upload.done();
|
||||
},
|
||||
`Upload-${params.Key}`,
|
||||
this.maxAttempts,
|
||||
);
|
||||
return result;
|
||||
} catch (error) {
|
||||
const s3Error = error as any;
|
||||
const errorName = s3Error?.name || 'Unknown';
|
||||
|
||||
// Check if this is a multipart upload failure that we can handle
|
||||
if (
|
||||
fileSizeInBytes >= 5 * 1024 * 1024 &&
|
||||
(errorName === 'InvalidPart' || errorName === 'NoSuchUpload')
|
||||
) {
|
||||
this.logger.warn(
|
||||
`Multipart upload failed for ${params.Key}, Attempting simple upload fallback.`,
|
||||
);
|
||||
try {
|
||||
// Attempt simple upload as a fallback
|
||||
const fileContent = await fs.readFile(absoluteFilePath);
|
||||
const simpleParams = { ...params, Body: fileContent };
|
||||
const fallbackResult = await this.storageClient.send(
|
||||
new PutObjectCommand(simpleParams),
|
||||
);
|
||||
this.logger.info(
|
||||
`Simple upload fallback succeeded for ${params.Key}`,
|
||||
);
|
||||
return fallbackResult;
|
||||
} catch (fallbackError) {
|
||||
this.logger.error(
|
||||
`Both multipart and simple upload failed for ${params.Key}: ${
|
||||
fallbackError instanceof Error
|
||||
? fallbackError.message
|
||||
: String(fallbackError)
|
||||
}`,
|
||||
);
|
||||
// Fall through to throw original error
|
||||
}
|
||||
}
|
||||
this.logger.error(
|
||||
`Upload failed for ${params.Key}: ${
|
||||
error instanceof Error ? error.message : String(error)
|
||||
}`,
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
absoluteFilesToUpload,
|
||||
{ concurrencyLimit: 10 },
|
||||
@@ -373,17 +600,21 @@ export class AwsS3Publish implements PublisherBase {
|
||||
|
||||
await bulkStorageOperation(
|
||||
async relativeFilePath => {
|
||||
return await this.storageClient.send(
|
||||
new DeleteObjectCommand({
|
||||
Bucket: this.bucketName,
|
||||
Key: relativeFilePath,
|
||||
}),
|
||||
return this.retryOperation(
|
||||
async () => {
|
||||
const deleteCommand = new DeleteObjectCommand({
|
||||
Bucket: this.bucketName,
|
||||
Key: relativeFilePath,
|
||||
});
|
||||
return this.storageClient.send(deleteCommand);
|
||||
},
|
||||
'DeleteObject',
|
||||
this.maxAttempts,
|
||||
);
|
||||
},
|
||||
staleFiles,
|
||||
{ concurrencyLimit: 10 },
|
||||
);
|
||||
|
||||
this.logger.info(
|
||||
`Successfully deleted stale files for Entity ${entity.metadata.name}. Total number of files: ${staleFiles.length}`,
|
||||
);
|
||||
@@ -391,6 +622,13 @@ export class AwsS3Publish implements PublisherBase {
|
||||
const errorMessage = `Unable to delete file(s) from AWS S3. ${error}`;
|
||||
this.logger.error(errorMessage);
|
||||
}
|
||||
const publishEndTime = Date.now();
|
||||
const publishDurationMs = publishEndTime - publishStartTime;
|
||||
this.logger.info(
|
||||
`Successfully published ${objects.length} files for ${
|
||||
entity.metadata.name
|
||||
} in ${Math.round(publishDurationMs / 1000)}s`,
|
||||
);
|
||||
return { objects };
|
||||
}
|
||||
|
||||
@@ -413,11 +651,16 @@ export class AwsS3Publish implements PublisherBase {
|
||||
}
|
||||
|
||||
try {
|
||||
const resp = await this.storageClient.send(
|
||||
new GetObjectCommand({
|
||||
Bucket: this.bucketName,
|
||||
Key: `${entityRootDir}/techdocs_metadata.json`,
|
||||
}),
|
||||
const resp = await this.retryOperation(
|
||||
async () => {
|
||||
const getCommand = new GetObjectCommand({
|
||||
Bucket: this.bucketName,
|
||||
Key: `${entityRootDir}/techdocs_metadata.json`,
|
||||
});
|
||||
return this.storageClient.send(getCommand);
|
||||
},
|
||||
'GetTechDocsMetadata',
|
||||
this.maxAttempts,
|
||||
);
|
||||
|
||||
const techdocsMetadataJson = await streamToBuffer(
|
||||
@@ -598,12 +841,18 @@ export class AwsS3Publish implements PublisherBase {
|
||||
let allObjects: ListObjectsV2CommandOutput;
|
||||
// Iterate through every file in the root of the publisher.
|
||||
do {
|
||||
allObjects = await this.storageClient.send(
|
||||
new ListObjectsV2Command({
|
||||
Bucket: this.bucketName,
|
||||
ContinuationToken: nextContinuation,
|
||||
...(prefix ? { Prefix: prefix } : {}),
|
||||
}),
|
||||
const currentToken = nextContinuation;
|
||||
allObjects = await this.retryOperation(
|
||||
async () => {
|
||||
const listCommand = new ListObjectsV2Command({
|
||||
Bucket: this.bucketName,
|
||||
ContinuationToken: currentToken,
|
||||
...(prefix ? { Prefix: prefix } : {}),
|
||||
});
|
||||
return this.storageClient.send(listCommand);
|
||||
},
|
||||
'GetAllObjects',
|
||||
this.maxAttempts,
|
||||
);
|
||||
objects.push(
|
||||
...(allObjects.Contents || []).map(f => f.Key || '').filter(f => !!f),
|
||||
|
||||
Reference in New Issue
Block a user