弃用 Lambda,Twitter 启用 Kafka 和数据流新架构

Posted InfoQ 架构头条

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了弃用 Lambda,Twitter 启用 Kafka 和数据流新架构相关的知识,希望对你有一定的参考价值。

作者 | Lu Zhang、Chukwudiuto Malife
译者 | Sambodhi
策划 | 闫园园

在 Twitter 上,我们每天都要实时处理大约 4000 亿个事件,生成 PB 级的数据。我们使用的数据的事件源多种多样,来自不同的平台和存储系统,例如 Hadoop、Vertica、Manhattan 分布式数据库、Kafka、Twitter Eventbus、GCS、BigQuery 和 PubSub。

为了处理这些源和平台中的这些类型的数据,Twitter 数据平台团队已经构建了内部工具,如用于批处理的 Scalding,用于流的 Heron,用于批处理和实时处理的名为 TimeSeries AggregatoR(TSAR)的集成框架,以及用于数据发现和消费的 Data Access Layer。然而,随着数据的快速增长,高规模仍然给工程师们用来运行管道的数据基础设施带来了挑战。比如,我们有一个交互和参与的管道,能够以批处理和实时的方式处理高规模数据。由于数据规模的快速增长,对流延迟、数据处理的准确性和数据的实时性提出了更高的要求。

对于交互和参与的管道,我们从各种实时流、服务器和客户端日志中采集并处理这些数据,从而提取到具有不同聚合级别、时间粒度和其他度量维度的 Tweet 和用户交互数据。这些聚合的交互数据尤其重要,并且是真正来自 Twitter 的广告收入服务和数据产品服务检索影响和参与度指标信息。此外,我们需要保证对存储系统中的交互数据进行快速查询,并在不同的数据中心之间实现低延迟和高准确性。为了构建这样一个系统,我们把整个工作流分解为几个部分,包括预处理、事件聚合和数据服务。

旧架构

旧的架构如下图所示。我们的 Lambda 架构具有批处理和实时处理管道,构建在 Summingbird 平台内,并与 TSAR 集成。如需进一步了解 Lambda 架构,请参阅《什么是 Lambda 架构?》(What is Lambda Architecture?)。批处理组件源是 Hadoop 日志,如客户端事件、时间线事件和 Tweet 事件,这些都是存储在 Hadoop 分布式文件系统(HDFS)上的。我们构建了几个 Scalding 管道,用于对原始日志进行预处理,并且将其作为离线来源摄入到 Summingbird 平台中。实时组件来源是 Kafka 主题。

实时数据存储在 Twitter Nighthawk 分布式缓存中,而批处理数据存储在 Manhattan 分布式存储系统中。我们有一个查询服务,可以在这两个存储中存取实时数据,而客户服务则会使用这些数据。Kafka 和数据流上的新架构

评估
系统性能评估

下面是两个架构之间的指标比较表。与旧架构中的 Heron 拓扑相比,新架构具有更低的延迟、更高的吞吐量。此外,新架构还能处理延迟事件计数,在进行实时聚合时不会丢失事件。此外,新架构中没有批处理组件,所以它简化了设计,降低了旧架构中存在的计算成本。

表 1:新旧架构的系统性能比较。

聚合计数验证

我们将计数验证过程分成两个步骤。首先,我们在数据流中,在重复数据删除之前和之后,对重复数据的百分比进行了评估。其次,对于所有键,我们直接比较了原始 TSAR 批处理管道的计数和重复数据删除后数据流的计数。

第一步,我们创建了一个单独的数据流管道,将重复数据删除前的原始事件直接从 Pubsub 导出到 BigQuery。然后,我们创建了用于连续时间的查询计数的预定查询。同时,我们会创建另外一条数据流管道,把被扣除的事件计数导出到 BigQuery。通过这种方式,我们就可以看出,重复事件的百分比和重复数据删除后的百分比变化。

第二步,我们创建了一个验证工作流,在这个工作流中,我们将重复数据删除的和汇总的数据导出到 BigQuery,并将原始 TSAR 批处理管道产生的数据从 Twitter 数据中心加载到谷歌云上的 BigQuery。这样我们就可以执行一个预定的查询,以便对所有键的计数进行比较。

在我们的 Tweet 交互流中,我们能够准确地和批处理数据进行超过 95% 的匹配。我们对低于 5% 的差异进行了研究,结果表明,这很大程度上是由于最初的 TSAR 批处理管道丢弃了后期事件,而这些事件被我们的新流管道捕获。这进一步证明了我们目前的系统产生了更高的准确性。

结语

通过将建立在 TSAR 上的旧架构迁移到 Twitter 数据中心和谷歌云平台上的混合架构,我们能够实时处理数十亿的事件,并实现低延迟、高准确度、稳定性、架构简单和减少工程师的运营成本。对于下一步,我们将使 Bigtable 数据集对区域故障具有弹性,并将我们的客户迁移到新的 LDC 查询服务器上。

作者介绍:

Lu Zhang,Twitter 高级软件工程师。

Chukwudiuto Malife,Twitter 高级软件工程师。

原文链接:

https://blog.twitter.com/engineering/en_us/topics/infrastructure/2021/processing-billions-of-events-in-real-time-at-twitter-

今日文章推荐:

谷歌母公司 Alphabet 股票大涨七成,创疫情以来最好表现

曾公开炮轰谷歌不重隐私,小众搜索引擎 DuckDuckGo 流量增长迅速


【直播推荐】

D2C 是近一年大厂内卷的热门领域,各大厂商纷纷布局,转转的 D2C 系统已经研发近两年时间,积累了大量 D2C 领域经验。

本周四晚 8 点,转转平台方向前端负责人神笔马良 D2C 系统核心作者张所勇将直播分享「从零开始搭建一个 D2C(设计稿转代码)系统」,揭秘 D2C 系统原理。

扫描二维码或点击阅读原文立即预约。 

javascript twitter通过lambda函数转发

'use strict';

const AWS = require("aws-sdk");
const dynamodb = new AWS.DynamoDB();
const querystring = require("querystring");
const http = require("https");
const TWITTER_API_URL = "https://api.twitter.com"
const TWITTER_HOSTNAME = "api.twitter.com"

/**
 * Get tweet id from DyanamoDB
 */
function getTeeetIds() {
  return [];
}

/**
 * API auth
 * @param {String} accessKey
 * @param {String} secretKey
 * @see https://developer.twitter.com/en/docs/basics/authentication/api-reference/token
 */
function auth(accessKey, secretKey) {
  return new Promise((resolve, reject) => {

    let encodedBearerToken = new Buffer(`${accessKey}:${secretKey}`).toString('base64');
    let postData = querystring.stringify({ "grant_type": "client_credentials" });
    let options = {
      hostname: TWITTER_HOSTNAME,
      port: '443',
      path: '/oauth2/token',
      method: 'POST',
      headers: {
        'Authorization': `Basic ${encodedBearerToken}`,
        'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
        'Content-Length': Buffer.byteLength(postData)
      }
    };

    console.log('Start request');
    let req = http.request(options, (res) => {
      console.log(`STATUS: ${res.statusCode}`);
      // console.log(`HEADERS: ${JSON.stringify(res.headers)}`);
      res.setEncoding('utf8');
      res.on('data', (chunk) => {
        console.log(chunk);
        console.log(`BODY: ${chunk}`);
        resolve(JSON.parse(chunk));
      });

      res.on('end', () => {
        console.log('No more data in response.');
      });

    }).on('error', (e) => {
      console.error(e);
      reject(e);
      // console.error(`problem with request: ${e.message}`);
    });

    console.log(postData);

    req.write(postData);
    req.end();
  });
}

/**
 * Call retweet API
 * @param {String} bearerToken
 * @param {String} tweetId
 * @return {Object} response
 * @see https://developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-retweet-id
 */
function getRetwet(bearerToken, tweetId) {
  return new Promise((resolve, reject) => {
    resolve(tweetId);
    // let encodedBearerToken = new Buffer(`${accessKey}:${secretKey}`).toString('base64');
    // let postData = querystring.stringify({ "grant_type": "client_credentials" });
    // let options = {
    //   hostname: TWITTER_HOSTNAME,
    //   port: '443',
    //   path: '/oauth2/token',
    //   method: 'POST',
    //   headers: {
    //     'Authorization': `Basic ${encodedBearerToken}`,
    //     'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
    //     'Content-Length': Buffer.byteLength(postData)
    //   }
    // };
    //
    // console.log('Start request');
    // let req = http.request(options, (res) => {
    //   console.log(`STATUS: ${res.statusCode}`);
    //   // console.log(`HEADERS: ${JSON.stringify(res.headers)}`);
    //   res.setEncoding('utf8');
    //   res.on('data', (chunk) => {
    //     console.log(chunk);
    //     console.log(`BODY: ${chunk}`);
    //     resolve(JSON.parse(chunk));
    //   });
    //
    //   res.on('end', () => {
    //     console.log('No more data in response.');
    //   });
    //
    // }).on('error', (e) => {
    //   console.error(e);
    //   reject(e);
    //   // console.error(`problem with request: ${e.message}`);
    // });
    //
    // console.log(postData);
    //
    // req.write(postData);
    // req.end();
  });
}

/**
 * Save s3
 */

function main(event, context, callback) {
  console.log(new Buffer('hoge').toString('base64'));
  console.log(new Buffer('aG9nZQ==', 'base64').toString('ascii'));
  let tweetIds = getTeeetIds();
  if (typeof process.env.TWITTER_ACCESS_KEY == "undefined" || typeof process.env.TWITTER_SECRET_KEY == "undefined") {
    if (callback) {
      callback("API key not found.");
    } else {
      console.error("API key not found.");
      process.exit(1);
    }
  }

  console.log("Authorization");
  auth(process.env.TWITTER_ACCESS_KEY, process.env.TWITTER_SECRET_KEY).then((data) => {
    console.log(data.access_token);
    return data.access_token;
  }).then(getRetwet).catch((error) => {
    if (callback) {
      callback(error);
    } else {
      console.error(error);
      process.exit(1);
    }
  }).then((data) => {
    console.log(data);
    if (callback) {
      callback(null, "Success");
    } else {
      console.log("Success");
      process.exit();
    }
  });
}

exports.lambdaHandler = function(event, context, callback) {
  main(event, context, callback);
}

main();

以上是关于弃用 Lambda,Twitter 启用 Kafka 和数据流新架构的主要内容,如果未能解决你的问题,请参考以下文章

javascript twitter通过lambda函数转发

检查 UILocalNotification 弃用后是不是启用用户通知

尝试 OAuth 到 Twitter 时,无服务器 Django 应用程序(通过 Zappa 的 AWS Lambda)超时

Twitter CRC Challenge 不适用于 AWS Lambda/API 网关

如何在 Twitter Bootstrap 模式中启用转义键关闭功能?

为 Lambda 代理集成启用 CORS 支持的问题