在调用 Service Fabric 传输中的可靠服务时传递用户和审核信息
Posted
技术标签:
【中文标题】在调用 Service Fabric 传输中的可靠服务时传递用户和审核信息【英文标题】:Passing user and auditing information in calls to Reliable Services in Service Fabric transport 【发布时间】:2017-05-28 12:55:58 【问题描述】:如何在客户端和服务之间以简单的方式传递审计信息,而无需将该信息添加为所有服务方法的参数?我可以使用消息头来设置通话数据吗?
有没有办法让服务也将其传递给下游,即如果服务A调用调用ServiceC的ServiceB,是否可以将相同的审计信息发送给第一个A,然后在A调用B,然后在B调用C?
【问题讨论】:
注意,最近有人问了一个类似的问题,但已删除。我向她添加了这个问题,因为我认为建议的解决方案可能对其他人有用。 【参考方案1】:如果您使用fabric transport 进行远程处理,实际上存在在客户端和服务之间传递的标头的概念。如果您使用的是 Http 传输,那么您将拥有与任何 http 请求一样的标头。
注意,下面的建议并不是最简单的解决方案,但是一旦到位就可以解决问题并且很容易使用,但是如果您在整个代码库中寻找简单的方法,这可能不是解决问题的方法走。如果是这种情况,那么我建议您只需将一些常见的审计信息参数添加到您的所有服务方法中。当然,当一些开发人员忘记添加它或者调用下游服务时没有正确设置它时,当然会有很大的警告。就像代码中一样,这一切都是关于权衡的:)。
钻进兔子洞
在结构传输中,通信涉及两个类:客户端的IServiceRemotingClient
实例和服务端的IServiceRemotingListener
实例。在来自客户端的每个请求中,都会发送消息正文 和 ServiceRemotingMessageHeaders
。开箱即用的这些标头包含有关哪个接口(即哪个服务)和正在调用哪个方法的信息(这也是底层接收器知道如何解压缩作为主体的字节数组的方式)。对于通过 ActorService 对 Actors 的调用,额外的 Actor 信息也包含在这些标头中。
棘手的部分是连接到该交换并实际设置然后读取其他标头。请耐心等待,这是我们需要了解的幕后涉及的许多课程。
服务端
当您为您的服务(例如无状态服务)设置IServiceRemotingListener
时,您通常会使用方便的扩展方法,如下所示:
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
yield return new ServiceInstanceListener(context =>
this.CreateServiceRemotingListener(this.Context));
(另一种方法是实现你自己的监听器,但这并不是我们真正不想在这里做的,我们只是不想在现有基础设施之上添加东西。见下文对于这种方法。)
这是我们可以提供自己的监听器的地方,类似于扩展方法在幕后所做的。我们先来看看那个扩展方法是做什么的。它会在您的服务项目的装配级别上寻找特定属性:ServiceRemotingProviderAttribute
。那个是abstract
,但是你可以使用的那个,如果没有提供,你会得到一个默认实例,是FabricTransportServiceRemotingProviderAttribute
。将其设置在AssemblyInfo.cs
(或任何其他文件,它是一个程序集属性)中:
[assembly: FabricTransportServiceRemotingProvider()]
这个属性有两个有趣的可重写方法:
public override IServiceRemotingListener CreateServiceRemotingListener(
ServiceContext serviceContext, IService serviceImplementation)
public override IServiceRemotingClientFactory CreateServiceRemotingClientFactory(
IServiceRemotingCallbackClient callbackClient)
这两个方法负责创建监听器和客户端工厂。这意味着它也被事务的客户端检查。这就是为什么它是服务程序集的程序集级别的属性,客户端也可以将它与我们要与之通信的客户端的IService
派生接口一起拾取。
CreateServiceRemotingListener
最终创建了一个实例FabricTransportServiceRemotingListener
,但是在这个实现中我们不能设置我们自己特定的IServiceRemotingMessageHandler
。如果您创建自己的 FabricTransportServiceRemotingProviderAttribute
子类并覆盖它,那么您实际上可以使其创建 FabricTransportServiceRemotingListener
的实例,该实例在构造函数中接收调度程序:
public class AuditableFabricTransportServiceRemotingProviderAttribute :
FabricTransportServiceRemotingProviderAttribute
public override IServiceRemotingListener CreateServiceRemotingListener(
ServiceContext serviceContext, IService serviceImplementation)
var messageHandler = new AuditableServiceRemotingDispatcher(
serviceContext, serviceImplementation);
return (IServiceRemotingListener)new FabricTransportServiceRemotingListener(
serviceContext: serviceContext,
messageHandler: messageHandler);
AuditableServiceRemotingDispatcher
是魔法发生的地方。它是我们自己的ServiceRemotingDispatcher
子类。覆盖RequestResponseAsync
(忽略HandleOneWay
,服务远程处理不支持,如果调用它会抛出NotImplementedException
),像这样:
public class AuditableServiceRemotingDispatcher : ServiceRemotingDispatcher
public AuditableServiceRemotingDispatcher(ServiceContext serviceContext, IService service) :
base(serviceContext, service)
public override async Task<byte[]> RequestResponseAsync(
IServiceRemotingRequestContext requestContext,
ServiceRemotingMessageHeaders messageHeaders,
byte[] requestBodyBytes)
byte[] userHeader = null;
if (messageHeaders.TryGetHeaderValue("user-header", out auditHeader))
// Deserialize from byte[] and handle the header
else
// Throw exception?
byte[] result = null;
result = await base.RequestResponseAsync(requestContext, messageHeaders, requestBodyBytes);
return result;
另一种更简单但不太灵活的方法是直接在服务中使用我们的自定义调度程序实例直接创建FabricTransportServiceRemotingListener
的实例:
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
yield return new ServiceInstanceListener(context =>
new FabricTransportServiceRemotingListener(this.Context, new AuditableServiceRemotingDispatcher(context, this)));
为什么这不太灵活?好吧,因为使用该属性也支持客户端,如下所示
客户端
好的,现在我们可以在接收消息时读取自定义标头,如何设置这些标头?让我们看看该属性的另一个方法:
public override IServiceRemotingClientFactory CreateServiceRemotingClientFactory(IServiceRemotingCallbackClient callbackClient)
return (IServiceRemotingClientFactory)new FabricTransportServiceRemotingClientFactory(
callbackClient: callbackClient,
servicePartitionResolver: (IServicePartitionResolver)null,
traceId: (string)null);
在这里,我们不能只为服务注入特定的处理程序或类似的处理程序,我们必须提供我们自己的自定义工厂。为了不必重新实现FabricTransportServiceRemotingClientFactory
的细节,我只是将它封装在我自己的IServiceRemotingClientFactory
实现中:
public class AuditedFabricTransportServiceRemotingClientFactory : IServiceRemotingClientFactory, ICommunicationClientFactory<IServiceRemotingClient>
private readonly ICommunicationClientFactory<IServiceRemotingClient> _innerClientFactory;
public AuditedFabricTransportServiceRemotingClientFactory(ICommunicationClientFactory<IServiceRemotingClient> innerClientFactory)
_innerClientFactory = innerClientFactory;
_innerClientFactory.ClientConnected += OnClientConnected;
_innerClientFactory.ClientDisconnected += OnClientDisconnected;
private void OnClientConnected(object sender, CommunicationClientEventArgs<IServiceRemotingClient> e)
EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> clientConnected = this.ClientConnected;
if (clientConnected == null) return;
clientConnected((object)this, new CommunicationClientEventArgs<IServiceRemotingClient>()
Client = e.Client
);
private void OnClientDisconnected(object sender, CommunicationClientEventArgs<IServiceRemotingClient> e)
EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> clientDisconnected = this.ClientDisconnected;
if (clientDisconnected == null) return;
clientDisconnected((object)this, new CommunicationClientEventArgs<IServiceRemotingClient>()
Client = e.Client
);
public async Task<IServiceRemotingClient> GetClientAsync(
Uri serviceUri,
ServicePartitionKey partitionKey,
TargetReplicaSelector targetReplicaSelector,
string listenerName,
OperationRetrySettings retrySettings,
CancellationToken cancellationToken)
var client = await _innerClientFactory.GetClientAsync(
serviceUri,
partitionKey,
targetReplicaSelector,
listenerName,
retrySettings,
cancellationToken);
return new AuditedFabricTransportServiceRemotingClient(client);
public async Task<IServiceRemotingClient> GetClientAsync(
ResolvedServicePartition previousRsp,
TargetReplicaSelector targetReplicaSelector,
string listenerName,
OperationRetrySettings retrySettings,
CancellationToken cancellationToken)
var client = await _innerClientFactory.GetClientAsync(
previousRsp,
targetReplicaSelector,
listenerName,
retrySettings,
cancellationToken);
return new AuditedFabricTransportServiceRemotingClient(client);
public Task<OperationRetryControl> ReportOperationExceptionAsync(
IServiceRemotingClient client,
ExceptionInformation exceptionInformation,
OperationRetrySettings retrySettings,
CancellationToken cancellationToken)
return _innerClientFactory.ReportOperationExceptionAsync(
client,
exceptionInformation,
retrySettings,
cancellationToken);
public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientConnected;
public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientDisconnected;
这个实现只是将任何繁重的工作传递给底层工厂,同时返回它自己的可审计客户端,类似地封装IServiceRemotingClient
:
public class AuditedFabricTransportServiceRemotingClient : IServiceRemotingClient, ICommunicationClient
private readonly IServiceRemotingClient _innerClient;
public AuditedFabricTransportServiceRemotingClient(IServiceRemotingClient innerClient)
_innerClient = innerClient;
~AuditedFabricTransportServiceRemotingClient()
if (this._innerClient == null) return;
var disposable = this._innerClient as IDisposable;
disposable?.Dispose();
Task<byte[]> IServiceRemotingClient.RequestResponseAsync(ServiceRemotingMessageHeaders messageHeaders, byte[] requestBody)
messageHeaders.SetUser(ServiceRequestContext.Current.User);
messageHeaders.SetCorrelationId(ServiceRequestContext.Current.CorrelationId);
return this._innerClient.RequestResponseAsync(messageHeaders, requestBody);
void IServiceRemotingClient.SendOneWay(ServiceRemotingMessageHeaders messageHeaders, byte[] requestBody)
messageHeaders.SetUser(ServiceRequestContext.Current.User);
messageHeaders.SetCorrelationId(ServiceRequestContext.Current.CorrelationId);
this._innerClient.SendOneWay(messageHeaders, requestBody);
public ResolvedServicePartition ResolvedServicePartition
get return this._innerClient.ResolvedServicePartition;
set this._innerClient.ResolvedServicePartition = value;
public string ListenerName
get return this._innerClient.ListenerName;
set this._innerClient.ListenerName = value;
public ResolvedServiceEndpoint Endpoint
get return this._innerClient.Endpoint;
set this._innerClient.Endpoint = value;
现在,这里是我们实际(也是最终)设置要传递给服务的审计名称的地方。
调用链和服务请求上下文
最后一个难题,ServiceRequestContext,它是一个自定义类,允许我们处理服务请求调用的环境上下文。这是相关的,因为它为我们提供了一种在调用链中传播上下文信息的简单方法,例如用户或相关 id(或我们希望在客户端和服务之间传递的任何其他标头信息)。实现ServiceRequestContext
看起来像:
public sealed class ServiceRequestContext
private static readonly string ContextKey = Guid.NewGuid().ToString();
public ServiceRequestContext(Guid correlationId, string user)
this.CorrelationId = correlationId;
this.User = user;
public Guid CorrelationId get; private set;
public string User get; private set;
public static ServiceRequestContext Current
get return (ServiceRequestContext)CallContext.LogicalGetData(ContextKey);
internal set
if (value == null)
CallContext.FreeNamedDataSlot(ContextKey);
else
CallContext.LogicalSetData(ContextKey, value);
public static Task RunInRequestContext(Func<Task> action, Guid correlationId, string user)
Task<Task> task = null;
task = new Task<Task>(async () =>
Debug.Assert(ServiceRequestContext.Current == null);
ServiceRequestContext.Current = new ServiceRequestContext(correlationId, user);
try
await action();
finally
ServiceRequestContext.Current = null;
);
task.Start();
return task.Unwrap();
public static Task<TResult> RunInRequestContext<TResult>(Func<Task<TResult>> action, Guid correlationId, string user)
Task<Task<TResult>> task = null;
task = new Task<Task<TResult>>(async () =>
Debug.Assert(ServiceRequestContext.Current == null);
ServiceRequestContext.Current = new ServiceRequestContext(correlationId, user);
try
return await action();
finally
ServiceRequestContext.Current = null;
);
task.Start();
return task.Unwrap<TResult>();
最后一部分深受SO answer by Stephen Cleary 的影响。它为我们提供了一种简单的方法来处理调用层次结构中的环境信息,天气它们在任务上是同步的还是异步的。现在,有了这个,我们也可以在服务端的 Dispatcher 中设置该信息:
public override Task<byte[]> RequestResponseAsync(
IServiceRemotingRequestContext requestContext,
ServiceRemotingMessageHeaders messageHeaders,
byte[] requestBody)
var user = messageHeaders.GetUser();
var correlationId = messageHeaders.GetCorrelationId();
return ServiceRequestContext.RunInRequestContext(async () =>
await base.RequestResponseAsync(
requestContext,
messageHeaders,
requestBody),
correlationId, user);
(GetUser()
和 GetCorrelationId()
只是获取和解包客户端设置的标头的辅助方法)
这意味着服务为任何附加调用创建的任何新客户端也将设置 sam 标头,因此在场景 ServiceA -> ServiceB -> ServiceC 中,我们仍将在调用中设置相同的用户从 ServiceB 到 ServiceC。
什么?这么容易?是的;)
从服务内部,例如无状态 OWIN Web api,您首先捕获用户信息,然后创建 ServiceProxyFactory
的实例并将该调用包装在 ServiceRequestContext
中:
var task = ServiceRequestContext.RunInRequestContext(async () =>
var serviceA = ServiceProxyFactory.CreateServiceProxy<IServiceA>(new Uri($"FabricRuntime.GetActivationContext().ApplicationName/ServiceA"));
await serviceA.DoStuffAsync(CancellationToken.None);
, Guid.NewGuid(), user);
好的,总结一下 - 您可以连接到远程服务以设置您自己的标头。正如我们在上面看到的,需要做一些工作来获得一个适当的机制,主要是创建你自己的底层基础设施的子类。好处是,一旦你有了这个,你就有了一种非常简单的方法来审计你的服务调用。
【讨论】:
我无法让这个解决方案发挥作用。我也确定我得到了所有这些东西,但我不确定如何实现这一点。我正在使用来自 rest api 的服务结构,然后尝试使用如图所示的标头。但我的 ServiceRequestContext 在下一个实例中为空 @martijn 当您在 ServiceRemotingDispatcher.RequestResponseAsync 中收到呼叫时,您是否收到发送的标头?如果在那里设置断点并检查标题,它们是空的吗? 很遗憾没有,但我还不明白如何在现有的演员和服务中实现代码 查看github.com/FredrikGoransson/…,我在其中放置了一个工作示例。看看你是否可以让它运行(只需在 AuditableServiceRemotingDispatcher.cs 中设置一个断点,你就可以看到每个调用的标题 User 和 CorrelationId) @yoape -- 有没有一种方法可以定制用于 Actors?我找不到覆盖侦听器的点,因为它们由下面的 ActorService “托管”。以上是关于在调用 Service Fabric 传输中的可靠服务时传递用户和审核信息的主要内容,如果未能解决你的问题,请参考以下文章
使用多个键调用 Service Fabric Reliable Dictionary
Service Fabric:具有分区负载平衡的可靠服务管道
Azure Service Fabric 可靠参与者与可靠服务