如何使用 AWS 流式传输上传的视频?

Posted

技术标签:

【中文标题】如何使用 AWS 流式传输上传的视频?【英文标题】:How to stream uploaded video with AWS? 【发布时间】:2021-07-06 12:14:45 【问题描述】:

主要任务是保护视频不被下载。

为了实现它,我们决定从 S3 设置视频流。

该项目有一个 php API 和一个客户端。 API 会生成预签名 URL,以将视频上传到 S3 存储桶中的位置。然后,客户端可以通过 CDN URL 请求视频。但是,使用签名的 url,可以从客户端下载视频。

我们找到了一种方法,使用 AWS Elemental MediaConverter 将视频转换为 MPEG-DASH。 MediaConverter 的 Job 可以通过 API 创建。然后应该通过 AWS Elemental MediaPackage 和 CloudFront 进行流式传输。

问题是:

如何了解视频上传完成的时间,以启动 MediaConverter Job? MPEG-DASH 文件具有 .mpd 清单,但 MediaPackage 需要 .smil 清单。如何从 .mpd 自动生成此文件?

附:如果我在某个地方错了,请纠正我。

【问题讨论】:

【参考方案1】:

如何了解视频上传完成,启动MediaConverter Job? 可以通过以下工作流程实现

    摄取用户将视频上传到 S3 中的监视文件夹存储桶 s3:PutItem 事件触发一个调用 MediaConvert 来转换视频的 Lambda 函数。 转换后的视频由 MediaConvert 存储在 S3 中

高级说明如下。

创建一个 Amazon S3 存储桶以用于上传要转换的视频。存储桶名称示例:vod-watchfolder-firstname-lastname

创建一个 Amazon S3 存储桶,用于存储从 MediaConvert 转换的视频输出(启用公共读取、静态网站托管和 CORS)

  <?xml version="1.0" encoding="UTF-8"?>
  <CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
  <CORSRule>
      <AllowedOrigin>*</AllowedOrigin>
      <AllowedMethod>GET</AllowedMethod>
      <MaxAgeSeconds>3000</MaxAgeSeconds>
      <AllowedHeader>*</AllowedHeader>
  </CORSRule>
  </CORSConfiguration>

创建一个 IAM 角色以传递给 MediaConvert。使用 IAM 控制台创建新角色。将其命名为 MediaConvertRole 并选择 AWS Lambda 作为角色类型。使用内联策略向 lambda 执行所需的其他资源授予权限。

为您的 Lambda 函数创建一个 IAM 角色。使用 IAM 控制台创建角色。将其命名为 VODLambdaRole 并选择 AWS Lambda 作为角色类型。将名为 AWSLambdaBasicExecutionRole 的托管策略附加到此角色以授予必要的 CloudWatch Logs 权限。使用内联策略向 lambda 执行所需的其他资源授予权限。

  
      "Version": "2012-10-17",
      "Statement": [
          
              "Action": [
                  "logs:CreateLogGroup",
                  "logs:CreateLogStream",
                  "logs:PutLogEvents"
              ],
              "Resource": "*",
              "Effect": "Allow",
              "Sid": "Logging"
          ,
          
              "Action": [
                  "iam:PassRole"
              ],
              "Resource": [
                  "ARNforMediaConvertRole"
              ],
              "Effect": "Allow",
              "Sid": "PassRole"
          ,
          
              "Action": [
                  "mediaconvert:*"
              ],
              "Resource": [
                  "*"
              ],
              "Effect": "Allow",
              "Sid": "MediaConvertService"
          ,
          
              "Action": [
                  "s3:*"
              ],
              "Resource": [
                  "*"
              ],
              "Effect": "Allow",
              "Sid": "S3Service"
          
      ]
  

创建一个用于转换视频的 lambda 函数。使用 AWS Lambda 控制台创建一个名为 VODLambdaConvert 的新 Lambda 函数,该函数将处理 API 请求。为您的函数代码使用提供的 convert.py 示例实现。

  #!/usr/bin/env python

  import glob
  import json
  import os
  import uuid
  import boto3
  import datetime
  import random
  from urllib.parse import urlparse
  import logging

  from botocore.client import ClientError

  logger = logging.getLogger()
  logger.setLevel(logging.INFO)

  S3 = boto3.resource('s3')

  def handler(event, context):
  '''
  Watchfolder handler - this lambda is triggered when video objects are uploaded to the 
  SourceS3Bucket/inputs folder.
  It will look for two sets of file inputs:
      SourceS3Bucket/inputs/SourceS3Key:
          the input video to be converted

      SourceS3Bucket/jobs/*.json:
          job settings for MediaConvert jobs to be run against the input video. If 
          there are no settings files in the jobs folder, then the Default job will be run 
          from the job.json file in lambda environment. 

  Ouput paths stored in outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination']
  are constructed from the name of the job settings files as follows:

      s3://<MediaBucket>/<basename(job settings filename)>/<basename(input)>/<Destination value from job settings file>
  '''

  assetID = str(uuid.uuid4())
  sourceS3Bucket = event['Records'][0]['s3']['bucket']['name']
  sourceS3Key = event['Records'][0]['s3']['object']['key']
  sourceS3 = 's3://'+ sourceS3Bucket + '/' + sourceS3Key
  destinationS3 = 's3://' + os.environ['DestinationBucket']
  mediaConvertRole = os.environ['MediaConvertRole']
  application = os.environ['Application']
  region = os.environ['AWS_DEFAULT_REGION']
  statusCode = 200
  jobs = []
  job = 

  # Use MediaConvert SDK UserMetadata to tag jobs with the assetID 
  # Events from MediaConvert will have the assetID in UserMedata
  jobMetadata = 
  jobMetadata['assetID'] = assetID
  jobMetadata['application'] = application
  jobMetadata['input'] = sourceS3

  try:    

      # Build a list of jobs to run against the input.  Use the settings files in WatchFolder/jobs
      # if any exist.  Otherwise, use the default job.

      jobInput = 
      # Iterates through all the objects in jobs folder of the WatchFolder bucket, doing the pagination for you. Each obj
      # contains a jobSettings JSON
      bucket = S3.Bucket(sourceS3Bucket)
      for obj in bucket.objects.filter(Prefix='jobs/'):
          if obj.key != "jobs/":
              jobInput = 
              jobInput['filename'] = obj.key
              logger.info('jobInput: %s', jobInput['filename'])

              jobInput['settings'] = json.loads(obj.get()['Body'].read())
              logger.info(json.dumps(jobInput['settings'])) 

              jobs.append(jobInput)

      # Use Default job settings in the lambda zip file in the current working directory
      if not jobs:

          with open('job.json') as json_data:
              jobInput['filename'] = 'Default'
              logger.info('jobInput: %s', jobInput['filename'])

              jobInput['settings'] = json.load(json_data)
              logger.info(json.dumps(jobInput['settings']))

              jobs.append(jobInput)

      # get the account-specific mediaconvert endpoint for this region
      mediaconvert_client = boto3.client('mediaconvert', region_name=region)
      endpoints = mediaconvert_client.describe_endpoints()

      # add the account-specific endpoint to the client session 
      client = boto3.client('mediaconvert', region_name=region, endpoint_url=endpoints['Endpoints'][0]['Url'], verify=False)

      for j in jobs:
          jobSettings = j['settings']
          jobFilename = j['filename']

          # Save the name of the settings file in the job userMetadata
          jobMetadata['settings'] = jobFilename

          # Update the job settings with the source video from the S3 event 
          jobSettings['Inputs'][0]['FileInput'] = sourceS3

          # Update the job settings with the destination paths for converted videos.  We want to replace the
          # destination bucket of the output paths in the job settings, but keep the rest of the
          # path
          destinationS3 = 's3://' + os.environ['DestinationBucket'] + '/' \
              + os.path.splitext(os.path.basename(sourceS3Key))[0] + '/' \
              + os.path.splitext(os.path.basename(jobFilename))[0]                 

          for outputGroup in jobSettings['OutputGroups']:

              logger.info("outputGroup['OutputGroupSettings']['Type'] == %s", outputGroup['OutputGroupSettings']['Type']) 

              if outputGroup['OutputGroupSettings']['Type'] == 'FILE_GROUP_SETTINGS':
                  templateDestination = outputGroup['OutputGroupSettings']['FileGroupSettings']['Destination']
                  templateDestinationKey = urlparse(templateDestination).path
                  logger.info("templateDestinationKey == %s", templateDestinationKey)
                  outputGroup['OutputGroupSettings']['FileGroupSettings']['Destination'] = destinationS3+templateDestinationKey

              elif outputGroup['OutputGroupSettings']['Type'] == 'HLS_GROUP_SETTINGS':
                  templateDestination = outputGroup['OutputGroupSettings']['HlsGroupSettings']['Destination']
                  templateDestinationKey = urlparse(templateDestination).path
                  logger.info("templateDestinationKey == %s", templateDestinationKey)
                  outputGroup['OutputGroupSettings']['HlsGroupSettings']['Destination'] = destinationS3+templateDestinationKey

              elif outputGroup['OutputGroupSettings']['Type'] == 'DASH_ISO_GROUP_SETTINGS':
                  templateDestination = outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination']
                  templateDestinationKey = urlparse(templateDestination).path
                  logger.info("templateDestinationKey == %s", templateDestinationKey)
                  outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination'] = destinationS3+templateDestinationKey

              elif outputGroup['OutputGroupSettings']['Type'] == 'DASH_ISO_GROUP_SETTINGS':
                  templateDestination = outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination']
                  templateDestinationKey = urlparse(templateDestination).path
                  logger.info("templateDestinationKey == %s", templateDestinationKey)
                  outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination'] = destinationS3+templateDestinationKey

              elif outputGroup['OutputGroupSettings']['Type'] == 'MS_SMOOTH_GROUP_SETTINGS':
                  templateDestination = outputGroup['OutputGroupSettings']['MsSmoothGroupSettings']['Destination']
                  templateDestinationKey = urlparse(templateDestination).path
                  logger.info("templateDestinationKey == %s", templateDestinationKey)
                  outputGroup['OutputGroupSettings']['MsSmoothGroupSettings']['Destination'] = destinationS3+templateDestinationKey

              elif outputGroup['OutputGroupSettings']['Type'] == 'CMAF_GROUP_SETTINGS':
                  templateDestination = outputGroup['OutputGroupSettings']['CmafGroupSettings']['Destination']
                  templateDestinationKey = urlparse(templateDestination).path
                  logger.info("templateDestinationKey == %s", templateDestinationKey)
                  outputGroup['OutputGroupSettings']['CmafGroupSettings']['Destination'] = destinationS3+templateDestinationKey
              else:
                  logger.error("Exception: Unknown Output Group Type %s", outputGroup['OutputGroupSettings']['Type'])
                  statusCode = 500

          logger.info(json.dumps(jobSettings))

          # Convert the video using AWS Elemental MediaConvert
          job = client.create_job(Role=mediaConvertRole, UserMetadata=jobMetadata, Settings=jobSettings)

  except Exception as e:
      logger.error('Exception: %s', e)
      statusCode = 500
      raise

  finally:
      return 
          'statusCode': statusCode,
          'body': json.dumps(job, indent=4, sort_keys=True, default=str),
          'headers': 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*'
      

确保将您的函数配置为使用您在上一节中创建的 VODLambdaRole IAM 角色。

为您的 Convert lambda 创建一个 S3 事件触发器。使用 AWS Lambda 控制台将 putItem 触发器从 vod-watchfolder-firstname-lastname S3 存储桶添加到 VODLambdaConvert lambda。

测试监视文件夹自动化。您可以使用自己的视频或使用此文件夹中包含的 test.mp4 视频来测试工作流程。

详情请参考本文https://github.com/aws-samples/aws-media-services-vod-automation/blob/master/MediaConvert-WorkflowWatchFolderAndNotification/README-tutorial.md

MPEG-DASH 文件具有 .mpd 清单,但 MediaPackage 需要 .smil 清单。如何从 .mpd 自动生成此文件?

截至今天,MediaConvert 没有自动生成 smil 文件功能。因此,您可以考虑将输出更改为 HLS 并提取到 Mediapackage。或者,手动创建 smil 文件。参考文件如下 HLS VOD 摄取到媒体包:https://github.com/aws-samples/aws-media-services-simple-vod-workflow/blob/master/13-VODMediaPackage/README-tutorial.md 创建 smil 文件:https://docs.aws.amazon.com/mediapackage/latest/ug/supported-inputs-vod-smil.html

【讨论】:

谢谢!这是一个很好的明确答案)

以上是关于如何使用 AWS 流式传输上传的视频?的主要内容,如果未能解决你的问题,请参考以下文章

开始使用 Python 进行安全 AWS CloudFront 流式传输

视频上传和流式传输(使用 Laravel VueJs 上传和显示所有类型的文件视频、图像、文档等)

如何从 aws kinesis 流式传输实时数据以响应基于前端

如何使用 html 5 视频元素流式传输视频

如何使用 MPEG-DASH 流式传输实时视频? [关闭]

如何使用 Play Framework 和 videojs 流式传输视频?