csharp 使用Akka.net对事件进行分区(linqpad - 参考akka.net和Rx-main)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了csharp 使用Akka.net对事件进行分区(linqpad - 参考akka.net和Rx-main)相关的知识,希望对你有一定的参考价值。

void Main()
{
	var system = ActorSystem.Create("MySystem");
	var greeters = system.ActorOf(Props.Create(() => new DeviceSplitter()));
	var r = new System.Random();
	var measures = Enumerable.Repeat(new [] {"a","b","c","d"}, 250000)
		.SelectMany(c => c)
		.Select (id => new Measure(id,r.Next()));
	foreach (var m in measures) greeters.Tell(m);
	Console.ReadLine();
}

public class Measure
{
    public Measure(string deviceId, double value)
    {
        DeviceId = deviceId;
		Value = value;
    }
    public string DeviceId { get;private set; }
    public double Value { get;private set; }
}

public class Dump {}

public class DeviceSplitter : ReceiveActor
{	
	public DeviceSplitter()
    {		
		Dictionary<string,IActorRef> knownDevices = new Dictionary<string,IActorRef>();
		Receive<Measure>(m => {
			if (!knownDevices.ContainsKey(m.DeviceId)) knownDevices.Add(m.DeviceId, Context.ActorOf(Props.Create(() => new DeviceActor())));
			knownDevices[m.DeviceId].Tell(m);
		});
    }
}

public class DeviceActor : ReceiveActor
{
	public DeviceActor() {
		var measures = new List<Measure>();
		var subject = new ReplaySubject<Measure>();
		Receive<Measure>(m => { 
			subject.OnNext(m);
		});
		subject.Buffer(5000).Subscribe(m => 
			Console.WriteLine("Average over last {0} from {1} = {2}",m.Count(),m.First().DeviceId,m.Average (x => x.Value)));
	}
}

以上是关于csharp 使用Akka.net对事件进行分区(linqpad - 参考akka.net和Rx-main)的主要内容,如果未能解决你的问题,请参考以下文章

Akka.NET 中的事件溯源和 CQRS

按事件时间对 Kinesis firehose S3 记录进行分区

Akka.net 性能测试兼使用小技巧

Akka.net路径里的user

Redshift Spectrum 使用两个日期字段对表进行分区

将 .net 远程处理替换为 WEB API、WCF、SignalR 或 Akka.net