一、简介
Amazon Simple Storage Service (Amazon S3) 是一种对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。
Amazon S3 提供了一个简单 Web 服务接口,可用于随时在 Web 上的任何位置存储和检索任何数量的数据。此服务让所有开发人员都能访问同一个具备高扩展性、可靠性、安全性和快速价廉的数据存储基础设施。
本文主要介绍一次上传整个文件和将大文件分为几段上传的内容。
二、上传文件
当上传的文件不是太大时,可以采用一次上传整个文件的方式上传,即使失败了,重新开始上传也不会太浪费时间。
// MultipartFile file是接收自前端的文件
String bucketName = "bucketName";
String key = "key";
long size = file.getSize();
AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient();
ObjectMetadata metadata = new ObjectMetadata();
// 必须设置ContentLength
metadata.setContentLength(size);
try {
//metadata可以为空long time1 = System.currentTimeMillis();s3.putObject(bucketName, key, file.getInputStream(), metadata);long time2 = System.currentTimeMillis();log.info("上传耗时:" + (time2 - time1));
} catch (Exception e) {
log.error("文件上传异常");
}
主要看s3.putObject(bucketName, key, file.getInputStream(), metadata);
这句。
其中,bucketName
是桶的名称,key
是文件在存储桶中显示的上传的文件路径以及文件名,metadata
如果没什么特别的,可以为null。
三、分段上传
3.1、什么是分段上传
S3 的分段上传就像是我们见过的文件分块下载一样,或者说是一个个 Chunk。它可让我们上传大数据(或文件时),分成最小 5M 大小段往 S3 上传,待分段全部成功上传到 S3 后再执行一条指令通知 S3 合并文件。只有最后一个段是可以小于 5MB 的,其他段小于 5MB 上传会有异常的。
S3 不分段上传,单个文件最大 5GB, 而分段后,每个段的大小在 5MB 到 5GB 之间,可以有 10000 个分段数量,所以最大单个文件可以达到 5TB. 分段上传大数据(文件) 的好处是可以提高吞吐量与上传的可靠性,分段可以同时上传,单个分段上传失败只需重传该分段,而无需全部重传。
注意的就是,会采用分段上传的 Bucket 应该设置好它的生命周期,否则烂在上面的未成功合并的并段将得不到清理。
3.2、代码实现(多线程)
Java 代码进行数据的分段上传,主要分以下几步:
- 初始化,声明说要开始一个 Multipart Upload, 并获得一个批次 ID, 大概意思是下面应用这个 ID 的分段将会被合并
- 上传每一个分段,并指定分段号(从 1 开始), 当前分段大小,并把每次分段请求的 ETag 记录下来
- completeMultipartUpload(…) 方法完成分段上传
然后我在这基础上,使用了多线程来实现。
// MultipartFile file是接收自前端的文件
String bucketName = "bucketName";
String key = "key";
long size = file.getSize();
long minPartSize = 5 * 1024 * 1024;
// 得到总共的段数,和 分段后,每个段的开始上传的字节位置
List<Long> positions = new ArrayList<>();
long filePosition = 0;
while (filePosition < size) {
positions.add(filePosition);filePosition += Math.min(minPartSize, (size - filePosition));
}
log.info("总大小:{},分为{}段", size, positions.size());
// 创建一个列表保存所有分传的 PartETag, 在分段完成后会用到
List<PartETag> partETags = new ArrayList<>();
// 第一步,初始化,声明下面将有一个 Multipart Upload
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, key);
InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest);
log.info("开始上传");
long begin = System.currentTimeMillis();
try {
// MultipartFile 转 FileFile toFile = multipartFileToFile(file);// 使用CountDownLatch来协调多个线程之间的同步final CountDownLatch latch = new CountDownLatch(positions.size());for (int i = 0; i < positions.size(); i++) {
int finalI = i;threadTaskExecutor.execute(() -> {
try {
long time1 = System.currentTimeMillis();UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(bucketName).withKey(key).withUploadId(initResponse.getUploadId()).withPartNumber(finalI + 1).withFileOffset(positions.get(finalI)).withFile(toFile).withPartSize(Math.min(minPartSize, (size - positions.get(finalI))));// 第二步,上传分段,并把当前段的 PartETag 放到列表中partETags.add(s3.uploadPart(uploadRequest).getPartETag());long time2 = System.currentTimeMillis();log.info("第{}段上传耗时:{}", finalI + 1, (time2 - time1));} catch (Exception e) {
log.error("Failed to upload, " + e.getMessage());} finally {
latch.countDown();}});}latch.await();// 第三步,完成上传,合并分段CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, key,initResponse.getUploadId(), partETags);s3.completeMultipartUpload(compRequest);
} catch (Exception e) {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, initResponse.getUploadId()));log.error("Failed to upload, " + e.getMessage());
}
long end = System.currentTimeMillis();
log.info("总上传耗时:{}", (end - begin));
第一步没什么说的,主要看for循环的内容。
循环上传,根据minPartSize
与剩余没有上传的部分进行比较,得出这一分段需要上传的大小。将这一分段的信息设置到UploadPartRequest
,设置的信息分别为bucketName
(文件将要上载到的现有存储桶的名称)、key
(在存储桶中显示上传的文件路径以及文件名)、uploadId
(文件上传的id)、partNumber
(这个分段是第几段)、fileOffset
(从文件的哪个字节开始上传)、file
、partSize
(这个分段的大小),设置这些信息后,就可以开始上传分段了。
UploadPartRequest设置完毕后,使用s3的uploadPart
方法将Request
上传,得到UploadPartResult
,这里要从result
里面获取ETag
信息出来,方便后续合并分段时使用。
最后所有的分段上传完毕,初始化CompleteMultipartUploadRequest
对象,这里需要uploadId
,以及每个分段上传的ETag
,使用s3.completeMultipartUpload
合并分段。
其中,多线程的配置和使用、CountDownLatch 的使用可以参考我之前写的博客:Spring Security中,多线程操作导致安全上下文丢失(附CountDownLatch的用法)
其中,multipartFileToFile
是我在网上随便找的一个MultipartFile 转 File 的方法,代码如下:
/*** MultipartFile 转 File*/
public static File multipartFileToFile(MultipartFile file) throws Exception {
File toFile = null;if (file.equals("") || file.getSize() <= 0) {
file = null;} else {
InputStream ins = null;ins = file.getInputStream();toFile = new File(file.getOriginalFilename());inputStreamToFile(ins, toFile);ins.close();}return toFile;
}/*** 获取流文件*/
private static void inputStreamToFile(InputStream ins, File file) {
try {
OutputStream os = new FileOutputStream(file);int bytesRead = 0;byte[] buffer = new byte[8192];while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
os.write(buffer, 0, bytesRead);}os.close();ins.close();} catch (Exception e) {
e.printStackTrace();}
}
3.3、总结
分段上传可以提升我们上传大文件的效率,它不是为了解决流式向 S3 写入数据而产生的。当然,若应用它的 Lambda 中,也确实可以缓解内存的紧张。它一定程度上是像流式写入,只是它的写入单位不是字节,或任意大小的字节数据,而是至少 5MB 的 Chunk.