当前位置: 代码迷 >> 综合 >> Amazon S3上传和分段上传
  详细解决方案

Amazon S3上传和分段上传

热度:120   发布时间:2023-11-01 09:33:37.0

一、简介

Amazon Simple Storage Service (Amazon S3) 是一种对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。

Amazon S3 提供了一个简单 Web 服务接口,可用于随时在 Web 上的任何位置存储和检索任何数量的数据。此服务让所有开发人员都能访问同一个具备高扩展性、可靠性、安全性和快速价廉的数据存储基础设施。

本文主要介绍一次上传整个文件和将大文件分为几段上传的内容。

二、上传文件

当上传的文件不是太大时,可以采用一次上传整个文件的方式上传,即使失败了,重新开始上传也不会太浪费时间。

// MultipartFile file是接收自前端的文件
String bucketName = "bucketName";
String key = "key";
long size = file.getSize();
AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient();
ObjectMetadata metadata = new ObjectMetadata();
// 必须设置ContentLength
metadata.setContentLength(size);
try {
    //metadata可以为空long time1 = System.currentTimeMillis();s3.putObject(bucketName, key, file.getInputStream(), metadata);long time2 = System.currentTimeMillis();log.info("上传耗时:" + (time2 - time1));
} catch (Exception e) {
    log.error("文件上传异常");
}

主要看s3.putObject(bucketName, key, file.getInputStream(), metadata);这句。

其中,bucketName是桶的名称,key是文件在存储桶中显示的上传的文件路径以及文件名,metadata如果没什么特别的,可以为null。

三、分段上传

3.1、什么是分段上传

S3 的分段上传就像是我们见过的文件分块下载一样,或者说是一个个 Chunk。它可让我们上传大数据(或文件时),分成最小 5M 大小段往 S3 上传,待分段全部成功上传到 S3 后再执行一条指令通知 S3 合并文件。只有最后一个段是可以小于 5MB 的,其他段小于 5MB 上传会有异常的。

S3 不分段上传,单个文件最大 5GB, 而分段后,每个段的大小在 5MB 到 5GB 之间,可以有 10000 个分段数量,所以最大单个文件可以达到 5TB. 分段上传大数据(文件) 的好处是可以提高吞吐量与上传的可靠性,分段可以同时上传,单个分段上传失败只需重传该分段,而无需全部重传。

注意的就是,会采用分段上传的 Bucket 应该设置好它的生命周期,否则烂在上面的未成功合并的并段将得不到清理。

3.2、代码实现(多线程)

Java 代码进行数据的分段上传,主要分以下几步:

  1. 初始化,声明说要开始一个 Multipart Upload, 并获得一个批次 ID, 大概意思是下面应用这个 ID 的分段将会被合并
  2. 上传每一个分段,并指定分段号(从 1 开始), 当前分段大小,并把每次分段请求的 ETag 记录下来
  3. completeMultipartUpload(…) 方法完成分段上传

然后我在这基础上,使用了多线程来实现。

// MultipartFile file是接收自前端的文件
String bucketName = "bucketName";
String key = "key";
long size = file.getSize();
long minPartSize = 5 * 1024 * 1024;
// 得到总共的段数,和 分段后,每个段的开始上传的字节位置
List<Long> positions = new ArrayList<>();
long filePosition = 0;
while (filePosition < size) {
    positions.add(filePosition);filePosition += Math.min(minPartSize, (size - filePosition));
}
log.info("总大小:{},分为{}段", size, positions.size());
// 创建一个列表保存所有分传的 PartETag, 在分段完成后会用到
List<PartETag> partETags = new ArrayList<>();
// 第一步,初始化,声明下面将有一个 Multipart Upload
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, key);
InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest);
log.info("开始上传");
long begin = System.currentTimeMillis();
try {
    // MultipartFile 转 FileFile toFile = multipartFileToFile(file);// 使用CountDownLatch来协调多个线程之间的同步final CountDownLatch latch = new CountDownLatch(positions.size());for (int i = 0; i < positions.size(); i++) {
    int finalI = i;threadTaskExecutor.execute(() -> {
    try {
    long time1 = System.currentTimeMillis();UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(bucketName).withKey(key).withUploadId(initResponse.getUploadId()).withPartNumber(finalI + 1).withFileOffset(positions.get(finalI)).withFile(toFile).withPartSize(Math.min(minPartSize, (size - positions.get(finalI))));// 第二步,上传分段,并把当前段的 PartETag 放到列表中partETags.add(s3.uploadPart(uploadRequest).getPartETag());long time2 = System.currentTimeMillis();log.info("第{}段上传耗时:{}", finalI + 1, (time2 - time1));} catch (Exception e) {
    log.error("Failed to upload, " + e.getMessage());} finally {
    latch.countDown();}});}latch.await();// 第三步,完成上传,合并分段CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, key,initResponse.getUploadId(), partETags);s3.completeMultipartUpload(compRequest);
} catch (Exception e) {
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, initResponse.getUploadId()));log.error("Failed to upload, " + e.getMessage());
}
long end = System.currentTimeMillis();
log.info("总上传耗时:{}", (end - begin));

第一步没什么说的,主要看for循环的内容。

循环上传,根据minPartSize与剩余没有上传的部分进行比较,得出这一分段需要上传的大小。将这一分段的信息设置到UploadPartRequest,设置的信息分别为bucketName(文件将要上载到的现有存储桶的名称)、key(在存储桶中显示上传的文件路径以及文件名)、uploadId(文件上传的id)、partNumber(这个分段是第几段)、fileOffset(从文件的哪个字节开始上传)、filepartSize(这个分段的大小),设置这些信息后,就可以开始上传分段了。

UploadPartRequest设置完毕后,使用s3的uploadPart方法将Request上传,得到UploadPartResult,这里要从result里面获取ETag信息出来,方便后续合并分段时使用。

最后所有的分段上传完毕,初始化CompleteMultipartUploadRequest对象,这里需要uploadId,以及每个分段上传的ETag,使用s3.completeMultipartUpload合并分段。

其中,多线程的配置和使用、CountDownLatch 的使用可以参考我之前写的博客:Spring Security中,多线程操作导致安全上下文丢失(附CountDownLatch的用法)

其中,multipartFileToFile是我在网上随便找的一个MultipartFile 转 File 的方法,代码如下:

/*** MultipartFile 转 File*/
public static File multipartFileToFile(MultipartFile file) throws Exception {
    File toFile = null;if (file.equals("") || file.getSize() <= 0) {
    file = null;} else {
    InputStream ins = null;ins = file.getInputStream();toFile = new File(file.getOriginalFilename());inputStreamToFile(ins, toFile);ins.close();}return toFile;
}/*** 获取流文件*/
private static void inputStreamToFile(InputStream ins, File file) {
    try {
    OutputStream os = new FileOutputStream(file);int bytesRead = 0;byte[] buffer = new byte[8192];while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
    os.write(buffer, 0, bytesRead);}os.close();ins.close();} catch (Exception e) {
    e.printStackTrace();}
}

3.3、总结

分段上传可以提升我们上传大文件的效率,它不是为了解决流式向 S3 写入数据而产生的。当然,若应用它的 Lambda 中,也确实可以缓解内存的紧张。它一定程度上是像流式写入,只是它的写入单位不是字节,或任意大小的字节数据,而是至少 5MB 的 Chunk.