使用 c# 将数据记录到 cassandra

Posted

技术标签:

【中文标题】使用 c# 将数据记录到 cassandra【英文标题】:Log data into cassandra using c# 【发布时间】:2020-08-05 02:35:46 【问题描述】:

我尝试使用 c# 将数据记录到 Cassandra。所以我的目标是在 200 毫秒内记录尽可能多的数据点。 我试图在 200 毫秒内节省时间、随机键和值。请参阅代码以供参考。在while循环之后如何执行会话的问题。

Cluster cluster = Cluster.Builder()
                   .AddContactPoint("127.0.0.1")
                   .Build();
ISession session = cluster.Connect("log");  //keyspace to connect with

var ps = session.Prepare("Insert into logcassandra(nanodate, key, value) values (?,?,?)");

stopwatch.Start();
while(stop.ElapsedMilliseconds <= 200)

    i++;
    var statement = ps.Bind(nanoTime(),"key"+i,"value"+i);
    session.ExecuteAsync(statement);

【问题讨论】:

ExecuteAsync 应该返回一些您需要等待的 promise/future/... 但在您的情况下,这不是实际查询的执行 - 它只是接受它以执行.. . 你想用这段代码实现什么? @AlexOtt 正如我在问题中提到的,我试图在 200 毫秒内记录尽可能多的数据。所以我做了一个运行 200 毫秒的 while 循环并将数据记录到 cassandra。当我在 while 循环中使用 ExecuteAsync 时,我认为它会减慢进程。有没有其他方法可以做到这一点???? 【参考方案1】:

请选择System.Threading.Timer 和TimerCallback 而不是Stopwatch


编辑:(回复评论

您好,我不确定您想要实现什么,但这里有一些关于异步调用和并行执行的一般概念。在 .NET 世界中,异步主要用于非阻塞 I/O 操作,这意味着您的调用线程不会等待 I/O 驱动程序的响应。换句话说,您实例化一个 I/O 操作并将这项工作分派给 .NET 生态系统之外的“事物”,这将为您带来未来Task)。驱动程序确认它收到了请求,并承诺一旦有空闲容量就会处理它。

Task 表示异步工作成功或失败。但是因为您是异步调用它,所以您并没有等待它的结果(不阻塞调用者线程以等待外部工作),而是继续执行下一条语句。最终此操作将完成,届时驱动程序将通知Task 请求操作已完成。 (Task可以看作是调用者和被调用者之间的主要通信通道)

在您的情况下,您使用的是 fire and forget 风格的异步调用。这意味着您正在异步触发大量 I/O 操作,而您忘记了处理它们的结果。你不知道他们中的任何一个失败与否。但是你已经打电话给Casandra做了很多工作人员。您的时间测量仅用于解雇工作,这意味着您不知道这些工作已经完成了多少。

如果您选择对异步调用使用await,这意味着您的while 循环将串行执行。您将启动一项工作,并且您无法继续进行下一次迭代,因为您正在等待它,因此您的调用者线程将在其调用堆栈中上移一级并检查它是否可以处理某些内容。如果也有await,那么它会向上移动一级,依此类推...

while(stop.ElapsedMilliseconds <= 200)

    await session.ExecuteAsync(statement);

如果您不希望串行执行而是并行,您可以根据需要创建任意数量的作业并等待它们作为一个整体。这就是Task.WhenAll 发挥作用的地方。您将解雇很多工作,您将等待将跟踪所有其他工作的单个工作。

var cassandraCalls = new List<Task>();
cassandraCalls.AddRange(Enumerable.Range(0, 100).Select(_ => session.ExecuteAsync(statement)));
await Task.WhenAll(cassandraCalls);

但此代码将一直运行到所有作业完成为止。如果你想限制整个执行时间,那么你应该使用一些取消机制。 Task.WhenAll 不支持 CancellationToken。但是您可以通过多种方式克服此限制。最简单的解决方案是Task.DelayTask.WhenAny 的组合。 Task.Delay 将用于超时,Task.WhenAny 将用于等待您的 cassandra 调用或超时完成。

var cassandraCalls = new List<Task>();
cassandraCalls.AddRange(Enumerable.Range(0, 100).Select(_ => ExecuteAsync()));
await Task.WhenAny(Task.WhenAll(cassandraCalls), Task.Delay(1000)); 

通过这种方式,您可以根据需要解雇尽可能多的工作,并且根据您的驱动程序,它们可以并行同时执行。您正在等待完成全部或经过一定的时间。当WhenAny 作业完成时,您可以检查作业的结果,但只需遍历 cassandraCalls

foreach (var call in cassandraCalls)

    Console.WriteLine(call.IsCompleted);

我希望这个解释对你有所帮助。

【讨论】:

正如我在问题中提到的,我试图在 200 毫秒内记录尽可能多的数据。所以我做了一个运行 200 毫秒的 while 循环并将数据记录到 cassandra。当我在 while 循环中使用 ExecuteAsync 时,我认为它会减慢进程。有没有其他方法可以做到这一点????

以上是关于使用 c# 将数据记录到 cassandra的主要内容,如果未能解决你的问题,请参考以下文章

C# 使用NLog记录日志

创建大量对象时内存不足C#

当其它程序写入记录数据库中后,我要立即取得其输入的数据,以写入到我的数据库中,用C#怎么做?

如何使用 LINQ C# 将一个数据表拆分为两个(匹配记录)(不匹配记录)

C# 保存对象

在 C# 中使用 Dapper 将数据发送到 RECORD 类型的 PL/SQL 脚本变量会更好吗