如何在 python 中使用 sqlalchemy 在查询中创建 sql server 表变量

Posted

技术标签:

【中文标题】如何在 python 中使用 sqlalchemy 在查询中创建 sql server 表变量【英文标题】:How to create a sql server table variable in a query using sqlalchemy in python 【发布时间】:2018-12-03 19:22:52 【问题描述】:

我正在尝试在 SQL Server 中创建一个表变量,对其进行查询,然后将结果返回到 pandas 数据框(参见示例)。我想这样做,以便在将数据库中的数据发送到熊猫数据框之前聚合它。我记得设置NOCOUNT ON 将允许它工作,因为它在执行每个查询时不会返回任何内容。但这行不通。所以这显然是一个示例代码,但我已经能够在这里重新创建错误。按照建议的链接为您提供documentating for ProgrammingErrors。我没有发现它很有帮助。

import urllib
import sqlalchemy
import pandas as pd

quoted = urllib.parse.quote_plus('DRIVER=ODBC Driver 17 for SQL Server;Server=127.0.0.1;Database=mydb;UID=myuser;PWD=mypasswd;Port=1433;')
engine = sqlalchemy.create_engine('mssql+pyodbc:///?odbc_connect='.format(quoted))

query = """
SET NOCOUNT ON;

DECLARE @n_majors TABLE (id varchar(9), n_majors int)

INSERT INTO @n_majors
SELECT m.student_id_fk
, COUNT(DISTINCT dc.category) AS [N majors declared]
FROM msu_db.dbo.Majors AS m
JOIN department_categories AS dc
ON dc.dept_name = m.dept_name
WHERE m.Student_Level_Code = 'UN'
GROUP BY m.student_id_fk

DECLARE @grad_category TABLE (id varchar(9), category varchar(20))

INSERT INTO @grad_category
select m.student_id_fk
, MIN(dc.category)
from Majors AS m
join department_categories as dc
on dc.dept_name = m.dept_name
WHERE m.Student_Level_Code = 'UN'
and graduated = 'CONF'
GROUP BY m.student_id_fk

DECLARE @first_category TABLE (id varchar(9), category varchar(20))

INSERT INTO @first_category
select m.student_id_fk
, MIN(dc.category) as cat
from Majors AS m
join department_categories as dc
on dc.dept_name = m.dept_name
WHERE m.Student_Level_Code = 'UN'
and graduated IS NULL
GROUP BY m.student_id_fk

DECLARE @first_semester_grades TABLE (id varchar(9), avg_grade float, std_grade float, first_Semester_seq_id varchar(4))

INSERT INTO @first_semester_grades
SELECT c.student_id_fk
, AVG(c.Grade) AS [mean grade]
, STDEV(c.Grade) AS [stdev grade]
, MIN(c.Term_Seq_Id) AS Term_Seq_Id
FROM Courses AS c
WHERE c.Student_Level_Code = 'UN'
GROUP BY c.student_id_fk

SET NOCOUNT OFF;

SELECT  s.[student_id_fk]
      ,[gender]
      ,[ethnicity]
      ,[first_course_datetime]
      ,[hs_gpa]
      ,[math_placement_score]
      ,[math_act]
      ,[natsci_act]
      ,COUNT(c.[transfer institution name]) AS [N AP courses]
      , nm.n_majors AS [n-categories]
      , fc.category
      , gc.category AS [grad category]
      , fsg.avg_grade AS first_term_avg
      , fsg.std_grade AS first_term_std
      , fsg.first_Semester_seq_id
  FROM [msu_db].[dbo].[Students] AS s
  LEFT JOIN msu_db.dbo.Courses AS c
  ON s.student_id_fk = c.student_id_fk
  AND c.[transfer institution name] = 'Advanced Placement'

  LEFT JOIN @n_majors as nm
  ON s.student_id_fk = nm.id

  LEFT JOIN @grad_category as gc
  ON s.student_id_fk = gc.id

  LEFT JOIN @first_category AS fc
  ON s.student_id_fk = fc.id

  LEFT JOIN @first_semester_grades AS fsg
  ON s.student_id_fk = fsg.id

  WHERE s.first_course_datetime BETWEEN '1993' AND '2013'

  GROUP BY s.[student_id_fk]
      ,[gender]
      ,[ethnicity]
      ,[first_course_datetime]
      ,[hs_gpa]
      ,[math_placement_score]
      ,[math_act]
      ,[natsci_act]
      , nm.n_majors
      , fc.category
      , gc.category
      , fsg.avg_grade
      , fsg.std_grade
      , fsg.first_Semester_seq_id
    """
pd.read_sql_query(query, engine)

输出的错误信息如下:

     --------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
~/anaconda3/envs/research/lib/python3.6/site-packages/sqlalchemy/engine/result.py in _fetchall_impl(self)
   1081         try:
-> 1082             return self.cursor.fetchall()
   1083         except AttributeError:

AttributeError: 'NoneType' object has no attribute 'fetchall'

During handling of the above exception, another exception occurred:

ResourceClosedError                       Traceback (most recent call last)
<ipython-input-3-2a0ea765a8e2> in <module>()
----> 1 df = pd.read_sql_query(query, engine)

~/anaconda3/envs/research/lib/python3.6/site-packages/pandas/io/sql.py in read_sql_query(sql, con, index_col, coerce_float, params, parse_dates, chunksize)
    312     return pandas_sql.read_query(
    313         sql, index_col=index_col, params=params, coerce_float=coerce_float,
--> 314         parse_dates=parse_dates, chunksize=chunksize)
    315 
    316 

~/anaconda3/envs/research/lib/python3.6/site-packages/pandas/io/sql.py in read_query(self, sql, index_col, coerce_float, parse_dates, params, chunksize)
   1070                                         parse_dates=parse_dates)
   1071         else:
-> 1072             data = result.fetchall()
   1073             frame = _wrap_result(data, columns, index_col=index_col,
   1074                                  coerce_float=coerce_float,

~/anaconda3/envs/research/lib/python3.6/site-packages/sqlalchemy/engine/result.py in fetchall(self)
   1135             self.connection._handle_dbapi_exception(
   1136                 e, None, None,
-> 1137                 self.cursor, self.context)
   1138 
   1139     def fetchmany(self, size=None):

~/anaconda3/envs/research/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1414                 )
   1415             else:
-> 1416                 util.reraise(*exc_info)
   1417 
   1418         finally:

~/anaconda3/envs/research/lib/python3.6/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
    185         if value.__traceback__ is not tb:
    186             raise value.with_traceback(tb)
--> 187         raise value
    188 
    189 else:

~/anaconda3/envs/research/lib/python3.6/site-packages/sqlalchemy/engine/result.py in fetchall(self)
   1129 
   1130         try:
-> 1131             l = self.process_rows(self._fetchall_impl())
   1132             self._soft_close()
   1133             return l

~/anaconda3/envs/research/lib/python3.6/site-packages/sqlalchemy/engine/result.py in _fetchall_impl(self)
   1082             return self.cursor.fetchall()
   1083         except AttributeError:
-> 1084             return self._non_result([])
   1085 
   1086     def _non_result(self, default):

~/anaconda3/envs/research/lib/python3.6/site-packages/sqlalchemy/engine/result.py in _non_result(self, default)
   1087         if self._metadata is None:
   1088             raise exc.ResourceClosedError(
-> 1089                 "This result object does not return rows. "
   1090                 "It has been closed automatically.",
   1091             )

ResourceClosedError: This result object does not return rows. It has been closed automatically.

似乎只要 NoneType 对象通过,它就会失败。我不明白的是为什么首先要传递 NoneType 对象。查询结果不应该传吗?

【问题讨论】:

【参考方案1】:

您在变量声明中拼错了 table - 它有一个 1 而不是 l。如果您认为应该起作用的某些东西不起作用,请先检查您的假设。

更新:

import urllib
import sqlalchemy
import pandas as pd

quoted = urllib.parse.quote_plus('DRIVER=ODBC Driver 17 for SQL Server;Server=127.0.0.1;Database=mydb;UID=myuser;PWD=mypasswd;Port=1433;')
engine = sqlalchemy.create_engine('mssql+pyodbc:///?odbc_connect='.format(quoted))

query = """
SET NOCOUNT ON
DECLARE @table TABLE (id int, value float)
INSERT INTO @table VALUES (1, 2.7)
INSERT INTO @table VALUES (2, 4.5)
INSERT INTO @table VALUES (3, 1.2)

SELECT * FROM @table
"""
pd.read_sql_query(query, engine)

【讨论】:

我已经更新了我原来的问题以获得原始代码,因为示例问题太简单了。【参考方案2】:

您必须在返回查询结果之前关闭 NOCOUNT,才能从 SQL Server 返回正确的行受影响的消息:

import urllib
import sqlalchemy
import pandas as pd

quoted = urllib.parse.quote_plus('DRIVER=ODBC Driver 17 for SQL Server;Server=127.0.0.1;Database=mydb;UID=myuser;PWD=mypasswd;Port=1433;')
engine = sqlalchemy.create_engine('mssql+pyodbc:///?odbc_connect='.format(quoted))

query = """
SET NOCOUNT ON
DECLARE @table TABLE (id int, value float)
INSERT INTO @table VALUES (1, 2.7)
INSERT INTO @table VALUES (2, 4.5)
INSERT INTO @table VALUES (3, 1.2)

SET NOCOUNT OFF
SELECT * FROM @table
"""
pd.read_sql_query(query, engine)

【讨论】:

虽然这解决了我创建的示例中的问题,但它并没有解决我编写的实际代码中的问题。我将实际代码添加到原始问题中以提供更多详细信息,因为我的示例问题不够复杂。

以上是关于如何在 python 中使用 sqlalchemy 在查询中创建 sql server 表变量的主要内容,如果未能解决你的问题,请参考以下文章

如何在 python 中使用 sqlalchemy 在查询中创建 sql server 表变量

如何避免在 SQLAlchemy - python 的多对多关系表中添加重复项?

如何使用 Python+SQLAlchemy 远程连接 MySQL 数据库?

如何在 Django Python 中使用 PostgreSQL 为 SQLAlchemy 连接池设置方言?需要启用预 ping 功能

我不知道如何使用 sqlalchemy.create_engine 在 python 中编写 sql 查询

Python-Sqlalchemy-Postgres:如何将子查询结果存储在变量中并将其用于主查询