如何在 Hystrix Observables 中传递 traceIds?

Posted

技术标签:

【中文标题】如何在 Hystrix Observables 中传递 traceIds?【英文标题】:How to pass traceIds in Hystrix Observables? 【发布时间】:2018-04-27 20:29:37 【问题描述】:

我有多个服务,其中一些使用 Hystrix 的 HystrixObservableCommand 来调用其他服务,而另一些使用 HystrixCommand。如何将调用服务中的 traceIds 传递给 HystrixObservableCommand 中的 Observables 并在调用回退时也传递它们?

所有服务都在使用 grpc-java。

我拥有的示例代码:

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
        String messageFromWorldService = "";
        String idFromWorldService = "";
        try 

            Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
            messageFromWorldService = greeterReply.getMessage();
            idFromWorldService = greeterReply.getId();
            logger.info("Response from WorldService  -- , id = ", messageFromWorldService, idFromWorldService);
         catch (StatusRuntimeException | InterruptedException | ExecutionException e) 
            logger.warn("Exception when calling WorldService\n" +  e);
        

WorldCommand.java

public class WorldCommand extends HystrixObservableCommand<Greeter.GreeterReply> 

    private static final Logger logger = LoggerFactory.getLogger(WorldCommand.class.getName());

    private final Greeter.GreeterRequest greeterRequest;
    private final WorldServiceGrpc.WorldServiceStub worldServiceStub;

    public WorldCommand(Greeter.GreeterRequest greeterRequest, WorldServiceGrpc.WorldServiceStub worldServiceStub) 
        super(HystrixCommandGroupKey.Factory.asKey("WorldService"));
        this.greeterRequest = greeterRequest;
        this.worldServiceStub = worldServiceStub;
    

    @Override
    protected Observable<Greeter.GreeterReply> construct() 
        Context context = Context.current();
        return Observable.create(new Observable.OnSubscribe<Greeter.GreeterReply>() 
            @Override
            public void call(Subscriber<? super Greeter.GreeterReply> observer) 
                logger.info("In WorldCommand");
                if (!observer.isUnsubscribed()) 
                    //pass on the context, if you want only certain headers to pass on then create a new Context and attach it.
                    context.attach();
                    logger.info("In WorldCommand after attach");
                    worldServiceStub.greetWithHelloOrWorld(greeterRequest, new StreamObserver<Greeter.GreeterReply>() 
                        @Override
                        public void onNext(Greeter.GreeterReply greeterReply) 
                            logger.info("Response from WorldService  -- , id = ", greeterReply.getMessage(), greeterReply.getId());
                            observer.onNext(greeterReply);
                            observer.onCompleted();
                        

                        @Override
                        public void onError(Throwable t) 
                            logger.info("Exception from WorldService  -- ", t);
                        

                        @Override
                        public void onCompleted() 

                        
                    );
                
            
         ).subscribeOn(Schedulers.io());
    

    @Override
    protected Observable<Greeter.GreeterReply> resumeWithFallback() 
        logger.info("Response from fallback");
        Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("teammate").setId("-1").build();
        return Observable.just(greeterReply);
    

我正在使用 Zipkin grpc tracking 和 MDCCurrentTraceContext 在日志中打印 traceId 和 spanId。

WorldCommand 中的两个日志条目都不会打印出 trace 和 span id,它们是在 Rxioscheduler 线程上调用的。

编辑

按照 Mike 的建议添加了 ConcurrencyStrategy。

public class CustomHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy 

    private static final Logger log = LoggerFactory.getLogger(CustomHystrixConcurrencyStrategy.class);

    public <T> Callable<T> wrapCallable(Callable<T> callable)
        log.info("In CustomHystrixConcurrencyStrategy: callable="+ callable.toString());
        return new ContextCallable<>(callable);
    

HelloService 调用两个服务 World 和 Team。 WorldCommand 是 HystrixObservableCommand,TeamCommand 是 HystrixCommand。

logger.info("In the HelloService:greetWithHelloWorld");
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();

//Call WorldService
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
//Async stub instead of blockingStub
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
String messageFromWorldService = "";
String idFromWorldService = "";
try 

    Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
    messageFromWorldService = greeterReply.getMessage();
    idFromWorldService = greeterReply.getId();
    logger.info("Response from WorldService  -- , id = ", messageFromWorldService, idFromWorldService);
 catch (StatusRuntimeException | InterruptedException | ExecutionException e) 
    logger.warn("Exception when calling WorldService\n" +  e);


//Call TeamService
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);

String messageFromTeamService = "";
String idFromTeamService = "";
try 
    Greeter.GreeterReply greeterReply = teamCommand.construct().toBlocking().toFuture().get();
    messageFromTeamService = greeterReply.getMessage();
    idFromTeamService = greeterReply.getId();
    logger.info("Response from TeamService  -- , id = ", messageFromTeamService, idFromTeamService);
 catch (StatusRuntimeException | InterruptedException | ExecutionException e) 
    logger.warn("Exception when calling TeamService\n" +  e);


assert(idFromWorldService.equals(idFromTeamService));
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();

PreservableContext 类

public class PreservableContexts 

    //private final TraceContext traceContext;
    private static final Logger logger = LoggerFactory.getLogger(PreservableContexts.class.getName());

    public PreservableContexts() 
        logger.info("Creating new PreservableContexts");
        //this.traceContext = TraceContextHolder.getContext();
    

    public void set() 
       // if (traceContext != null) 
            //TraceContextHolder.setContext(traceContext);
       // 
    

    public void clear() 
        //TraceContextHolder.clearContext();
    

PreservableContexts 和 CustomHystrixConcurrencyStrategy 中的日志永远不会被打印出来。我在启动 HelloServer 时正在注册 startegy。

HystrixConcurrencyStrategy strategy = new CustomHystrixConcurrencyStrategy();
        HystrixPlugins.getInstance().registerConcurrencyStrategy(strategy);
        context = HystrixRequestContext.initializeContext();

编辑 2

更新了 Observable 的设置方式:

    ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
    //Async stub instead of blockingStub
    WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);
    WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);

    //Call TeamService
    ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
    TeamServiceGrpc.TeamServiceStub teamServiceStub = TeamServiceGrpc.newStub(teamChannel);
    //TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
    TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);

    try 
        rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
        rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
        Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() 
            @Override
            public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) 
                String messageFromWorldService = worldReply.getMessage();
                String idFromWorldService = worldReply.getId();
                logger.info("Response from WorldService  -- , id = ", messageFromWorldService, idFromWorldService);

                String messageFromTeamService = teamReply.getMessage();
                String idFromTeamService = teamReply.getId();
                logger.info("Response from TeamService  -- , id = ", messageFromTeamService, idFromTeamService);

                assert(idFromWorldService.equals(idFromTeamService));
                Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
                logger.info("Final response=" + greeterReply.getMessage());
                responseObserver.onNext(greeterReply);
                responseObserver.onCompleted();
                return null;
            
        );
     catch (StatusRuntimeException e) 
        logger.warn("Exception when calling WorldService and/or TeamService\n" +  e);
    

我现在有一个奇怪的问题,对 TeamCommand 和 WorldCommand 的调用没有完成,因为这段代码永远不会执行:

Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() 
                @Override
                public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) 
                    String messageFromWorldService = worldReply.getMessage();

另外,如果有回退,hystrix-timer 线程不再有 MDC。

【问题讨论】:

【参考方案1】:

我对 hysterix 了解不多,但如果您尝试传递一些上下文信息,如跟踪 ID,那么 io.grpc.Context 是正确的使用类。您需要调用context.withValue 来创建带有traceID 的新上下文。在您想要数据的地方,您需要附加上下文。完成后还要确保分离上下文,我在你的 sn-p 中看不到这种情况。

【讨论】:

【参考方案2】:

你需要使用 ...

HystrixPlugins.getInstance().registerConcurrencyStrategy(...)

...注册一个使用您自己的Callable的自定义HystrixConcurrencyStrategy ...

public class ConcurrencyStrategy extends HystrixConcurrencyStrategy     
    @Override
    public <K> Callable<K> wrapCallable(Callable<K> c) 
        return new ContextCallable<>(c);
    

...在电路周围应用上下文保存...

public class ContextCallable<K> implements Callable<K> 

    private final Callable<K> callable;
    private final PreservableContexts contexts;

    public ContextCallable(Callable<K> actual) 
        this.callable = actual;
        this.contexts = new PreservableContexts();
    

    @Override
    public K call() throws Exception 
        contexts.set();
        try 
            return callable.call();
         finally 
            contexts.clear();
        
    

... via 是一个能够保存 Zipkin 上下文的辅助类 ...

public class PreservableContexts 

    private final TraceContext traceContext;

    public PreservableContexts() 
        this.traceContext = TraceContextHolder.getContext();
    

    public void set() 
        if (traceContext != null) 
            TraceContextHolder.setContext(traceContext);
        
    

    public void clear() 
        TraceContextHolder.clearContext();
    


... 并允许添加您可能希望保留的其他上下文的简单方法,例如MDC、SecurityContext 等 ...

【讨论】:

我创建了一个自定义 ConcurrentStrategy 类并添加了日志以检查它是否被调用,但没有打印任何日志。我编辑了这个问题,因为很难在评论中很好地格式化它。有任何想法吗?此外,HystrixConcurrenyStrategy 的 Javadoc 提到它与 HystrixCommand 一起使用,而不是 HystrixObservableCommand - ```例如,由 HystrixCommand 执行的每个 Callable 都将调用 wrapCallable(Callable) 以使自定义实现有机会用其他行为装饰 Callable。 ```自定义的 ConcurrentStrategy 是否适用于两者? 假设您使用Thread Isolation Strategy,那么应该应用Concurrency Strategy ...我不确定信号量隔离,但看起来您无论如何都不会选择它。我刚刚在本地对此进行了测试,并且 Observable 命令确实触发了该插件。我认为您遇到的问题是您如何调用命令。而不是teamCommand.construct() 尝试teamCommand.execute()teamCommand.queue() Synchronous Execution 和Asynchronous Execution 的说明可以在How To Use 页面上找到。 感谢 Mike,更改后自定义并发策略被调用。我实际上最终也需要信号量隔离。我更改了代码并更新了问题。所以现在,调用永远不会回来,并且回退日志没有 MDC。调用确实会被执行并拥有我想要的 MDC。 我认为我们现在已经脱离了原始帖子的主题。可能是时候将原始问题标记为已回答并提出单独的问题了。但是,快速浏览会告诉我您需要订阅 Observable.zip(...) 返回的对象。

以上是关于如何在 Hystrix Observables 中传递 traceIds?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 RxJS 中“等待”两个 observables

如何在独立于生命周期的类中处理 Observables?

如何在模板 Angular 2 中呈现 observables 数组长度

如何在 Reactive Extensions 中组合两个 observables 以对结果进行分页?

如何使用 Angular 中的 observables 在 API http 请求的所有页面中获取数据?

如何在 Observables 上设置正确的类型?