如何使用 aws-java-sdk 从 S3 逐块读取文件

Posted

技术标签:

【中文标题】如何使用 aws-java-sdk 从 S3 逐块读取文件【英文标题】:How to read file chunk by chunk from S3 using aws-java-sdk 【发布时间】:2017-11-07 10:08:17 【问题描述】:

我正在尝试从 S3 将大文件读入块中,而不为并行处理切割任何行。

让我举例说明: S3上有1G大小的文件。我想将此文件分成 64 MB 的块。我可以这样做很容易:

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));

InputStream stream = s3object.getObjectContent();

byte[] content = new byte[64*1024*1024];

while (stream.read(content)  != -1) 

//process content here 


但是块的问题是它可能有 100 个完整的行和一个不完整的行。但我无法处理不完整的行,也不想丢弃它。

有什么办法可以处理这种情况吗?表示所有卡盘都没有偏线。

【问题讨论】:

【参考方案1】:

aws-java-sdk 已经为您的 S3 对象提供了流式处理功能。您必须调用“getObject”,结果将是一个 InputStream。

1) AmazonS3Client.getObject(GetObjectRequest getObjectRequest) -> S3Object

2) S3Object.getObjectContent()

注意:该方法是一个简单的 getter,实际上并不创建一个 溪流。如果您检索 S3Object,则应关闭此输入 尽快流式传输,因为对象内容不是 缓冲在内存中并直接从 Amazon S3 流式传输。进一步, 未能关闭此流可能导致请求池变为 被屏蔽了。

aws java docs

【讨论】:

【参考方案2】:

100 行完整,1 行不完整

你的意思是你需要逐行阅读流水线吗?如果是这样,请尝试使用 BufferedReader 读取 s3 对象流,而不是使用 InputStream,这样您就可以逐行读取流,但我认为这会比逐块慢一点。

        S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
        BufferedReader in = new BufferedReader(new InputStreamReader(s3object.getObjectContent()));
        String line;
        while ((line = in.readLine()) != null)  

//process line here

        

【讨论】:

【参考方案3】:

我通常的方法(InputStream -> BufferedReader.lines() -> 批量行 -> CompletableFuture)在这里不起作用,因为底层的S3ObjectInputStream 最终会因大文件而超时。

所以我创建了一个新类 S3InputStream,它不关心它的开放时间,并使用短期 AWS 开发工具包调用按需读取字节块。您提供一个byte[] 将被重用。 new byte[1 << 24] (16Mb) 似乎运行良好。

package org.harrison;

import java.io.IOException;
import java.io.InputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;

/**
 * An @link InputStream for S3 files that does not care how big the file is.
 *
 * @author stephen harrison
 */
public class S3InputStream extends InputStream 
    private static class LazyHolder 
        private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
    

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStream(final String bucket, final String file, final byte[] buffer) 
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
    

    @Override
    public int read() throws IOException 
        if (next >= length) 
            fill();

            if (length <= 0) 
                return -1;
            

            next = 0;
        

        if (next >= length) 
            return -1;
        

        return buffer[this.next++];
    

    public void fill() throws IOException 
        if (offset >= lastByteOffset) 
            length = -1;
         else 
            try (final InputStream inputStream = s3Object()) 
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) 
                    buffer[length++] = (byte) b;
                

                if (length > 0) 
                    offset += length;
                
            
        
    

    private InputStream s3Object() 
        final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
                offset + buffer.length - 1);

        return LazyHolder.S3.getObject(request).getObjectContent();
    

【讨论】:

谢谢。我发现这个 InputStream 比你从 sdk getObject 方法得到的要可靠得多。我为 sdk 的 v2 更新了它——请参阅我的新答案【参考方案4】:

您可以通过检查令牌来读取存储桶中的所有文件。您可以使用其他 java 库读取文件.. 即 Pdf。

import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import javax.swing.JTextArea;
import java.io.FileWriter;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.text.PDFTextStripper;
import org.apache.pdfbox.text.PDFTextStripperByArea;
import org.joda.time.DateTime;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.File; 
   //..
   // in your main class 
   private static AWSCredentials credentials = null;
   private static AmazonS3 amazonS3Client = null;

   public static void intializeAmazonObjects() 
        credentials = new BasicAWSCredentials(ACCESS_KEY, SECRET_ACCESS_KEY);
        amazonS3Client = new AmazonS3Client(credentials);
    
   public void mainMethod() throws IOException, AmazonS3Exception
        // connect to aws
        intializeAmazonObjects();

    ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucketName);
    ListObjectsV2Result listObjectsResult;
do 

        listObjectsResult = amazonS3Client.listObjectsV2(req);
        int count = 0;
        for (S3ObjectSummary objectSummary : listObjectsResult.getObjectSummaries()) 
            System.out.printf(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize());

            // Date lastModifiedDate = objectSummary.getLastModified();

            // String bucket = objectSummary.getBucketName();
            String key = objectSummary.getKey();
            String newKey = "";
            String newBucket = "";
            String resultText = "";

            // only try to read pdf files
            if (!key.contains(".pdf")) 
                continue;
            

            // Read the source file as text
            String pdfFileInText = readAwsFile(objectSummary.getBucketName(), objectSummary.getKey());
            if (pdfFileInText.isEmpty())
                continue;
        //end of current bulk

        // If there are more than maxKeys(in this case 999 default) keys in the bucket,
        // get a continuation token
        // and list the next objects.
        String token = listObjectsResult.getNextContinuationToken();
        System.out.println("Next Continuation Token: " + token);
        req.setContinuationToken(token);
     while (listObjectsResult.isTruncated());


public String readAwsFile(String bucketName, String keyName) 
    S3Object object;
    String pdfFileInText = "";
    try 

        // AmazonS3 s3client = getAmazonS3ClientObject();
        object = amazonS3Client.getObject(new GetObjectRequest(bucketName, keyName));
        InputStream objectData = object.getObjectContent();

        PDDocument document = PDDocument.load(objectData);
        document.getClass();

        if (!document.isEncrypted()) 

            PDFTextStripperByArea stripper = new PDFTextStripperByArea();
            stripper.setSortByPosition(true);

            PDFTextStripper tStripper = new PDFTextStripper();

            pdfFileInText = tStripper.getText(document);

        

     catch (Exception e) 
        e.printStackTrace();
    
    return pdfFileInText;

【讨论】:

【参考方案5】:

@stephen-harrison 的回答效果很好。我为 sdk 的 v2 更新了它。我做了一些调整:主要是现在可以授权连接,并且 LazyHolder 类不再是静态的——我不知道如何授权连接并仍然保持类静态。

有关使用 Scala 的另一种方法,请参阅https://alexwlchan.net/2019/09/streaming-large-s3-objects/

    package foo.whatever;

    import java.io.IOException;
    import java.io.InputStream;

     import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
     import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
     import software.amazon.awssdk.regions.Region;
     import software.amazon.awssdk.services.s3.S3Client;
     import software.amazon.awssdk.services.s3.model.GetObjectRequest;
     import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
     import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

    /**
     * Adapted for aws Java sdk v2 by jomofrodo@gmail.com
     * 
     * An @link InputStream for S3 files that does not care how big the file   is.
     *
     * @author stephen harrison
     */
   public class S3InputStreamV2 extends InputStream 
       private class LazyHolder 
           String appID;
           String secretKey;
           Region region = Region.US_WEST_1;
           public S3Client S3 = null;

           public void connect() 
               AwsBasicCredentials awsCreds = AwsBasicCredentials.create(appID, secretKey);
               S3 =  S3Client.builder().region(region).credentialsProvider(StaticCredentialsProvider.create(awsCreds))
                    .build();
           

        private HeadObjectResponse getHead(String keyName, String bucketName) 
            HeadObjectRequest objectRequest = HeadObjectRequest.builder().key(keyName).bucket(bucketName).build();

            HeadObjectResponse objectHead = S3.headObject(objectRequest);
            return objectHead;
        

        // public static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();

    

    private LazyHolder lazyHolder = new LazyHolder();

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStreamV2(final String bucket, final String file, final byte[] buffer, String appID, String secret) 
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        lazyHolder.appID = appID;
        lazyHolder.secretKey = secret;
        lazyHolder.connect();
        this.lastByteOffset = lazyHolder.getHead(file, bucket).contentLength();
    

    @Override
    public int read() throws IOException 
        if (next >= length || (next == buffer.length && length == buffer.length)) 
            fill();

            if (length <= 0) 
                return -1;
            

            next = 0;
        

        if (next >= length) 
            return -1;
        

        return buffer[this.next++] & 0xFF;
    

    public void fill() throws IOException 
        if (offset >= lastByteOffset) 
            length = -1;
         else 
            try (final InputStream inputStream = s3Object()) 
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) 
                    buffer[length++] = (byte) b;
                

                if (length > 0) 
                    offset += length;
                
            
        
    

    private InputStream s3Object() 
        final Long rangeEnd = offset + buffer.length - 1;
        final String rangeString = "bytes=" + offset + "-" + rangeEnd;
        final GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucket).key(file).range(rangeString)
                .build();

        return lazyHolder.S3.getObject(getObjectRequest);
    

【讨论】:

请注意:此类工作非常可靠。我已经将它与 0.5 到 5Mb 的 byte[] 一起使用。更大的缓冲区大小更快,但显然会占用更多内存。请注意,缓冲区越小,与 S3 建立的连接就越多,这将产生更多成本。

以上是关于如何使用 aws-java-sdk 从 S3 逐块读取文件的主要内容,如果未能解决你的问题,请参考以下文章

如何使用scala和aws-java-sdk从S3存储桶中获取所有S3ObjectSummary?

hadoop 2.7.7 的 AWS-Java-SDK 版本问题

如何从 Apache Spark 访问 s3a:// 文件?

Spark + S3 + IAM 角色

如何从表中逐块获取数据?

与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃