如何在 Rust 中使用 `sqlx` 和 `juniper` 订阅?

Posted

技术标签:

【中文标题】如何在 Rust 中使用 `sqlx` 和 `juniper` 订阅?【英文标题】:How does one use `sqlx` with `juniper` subscriptions in Rust? 【发布时间】:2021-03-14 00:41:52 【问题描述】:

背景:

我在将 sqlxjuniper 订阅集成时遇到问题。

我从sqlx::query::QueryAs::fetch() 收到Pin<Box<dyn Stream<Item = Result<User, sqlx::Error>> + 'e + Send>>

juniper 需要将subscriptions 返回为Pin<Box<dyn Stream<Item = Result<User, juniper::FieldError>> + Send>>

注意从Result<User, sqlx::Error>Result<User, juniper::FieldError> 的变化。使用map_err() from futures::TryStreamExt,我创建了以下代码来执行查询并转换错误类型。

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot 
    async fn users(context: &Context) -> UsersStream 
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&context.pool)
            .map_err(|e| 
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("", e)))
            )
            .boxed()
    

编译失败,出现以下错误:

error[E0759]: `executor` has lifetime `'ref_e` but it needs to satisfy a `'static` lifetime requirement
  --> server/src/graphql/subscription.rs:27:1
   |
27 |   #[juniper::graphql_subscription(Context = Context)]
   |   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |   |
   |   this data with lifetime `'ref_e`...
   |   ...is captured here...
...
63 | /         sqlx::query_as!(User, "SELECT * FROM users")
64 | |             .fetch(&context.pool)
65 | |             .map_err(|e| 
66 | |                 FieldError::new(
...  |
69 | |             )
70 | |             .boxed()
   | |____________________- ...and is required to live as long as `'static` here
   |
   = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

error: aborting due to previous error

我对@9​​87654339@s 或生命周期不够熟悉,无法理解此错误的含义。在进一步研究之后,似乎ref_e 是订阅对juniperExecutor 的引用的生命周期。

尝试:

juniper::Context 提供生命周期,如graphql-rust/juniper#143 中所述。 更高等级的特征界限

版本:

sqlx-0.4.1 juniper 固定提交 cd66bdbmaster

【问题讨论】:

其实我刚刚为个人项目实现了一个玩具graphql服务器,并决定不使用sqlx,因为我也无法集成它,所以我很期待这个答案。我会说发布您的 Context 的样子也是有益的,因为我在定义我的时遇到了一些“借用检查”问题。 @fvall 你用什么代替了 sqlx ? 我做了一个小仓库来重现这个问题:github.com/mathroc/juniper-sqlx-subscriptions 【参考方案1】:

answer by Mathieu 是正确的。所以我会解释错误背后的原因。

这里的根本问题是,当您返回 UsersStream 时,您会将数据移出函数 fn users(..)。现在函数users(..) 的调用者拥有返回的数据,并且(理论上)可以做任何它想做的事情,包括在应用程序的生命周期内保留数据,即赋予数据“静态生命周期”。但是如果UsersStream 引用了一个生命周期有限的数据(context.pool),那么当pool 的所有者丢弃数据时会发生什么?当数据被删除时,它将引用空指针。所以编译器会抛出错误来防止这种情况发生。

所以你可以在这里做的是以某种方式传递拥有的数据(pool)而不是引用,确保poolUsersStream 具有相同的生命周期,因为它现在拥有数据。 clone() 正是这样做的,它创建了一个拥有的数据副本(引用计数或字节副本)。

【讨论】:

【参考方案2】:

您的代码与我的不完全一样,但我认为该解决方案也可以在这里应用,请在使用之前尝试克隆池:

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot 
    async fn users(context: &Context) -> UsersStream 
        let pool = context.pool.clone();
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&pool)
            .map_err(|e| 
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("", e)))
            )
            .boxed()
    

【讨论】:

以上是关于如何在 Rust 中使用 `sqlx` 和 `juniper` 订阅?的主要内容,如果未能解决你的问题,请参考以下文章

sqlx::query_as和sqlx::query_as!有什么不同?

Rust Web 全栈开发之自建TCPHTTP Server

GoLang sqlx库使用

Go_sqlx和占位符

sqlx使用说明

sqlx库基本使用