如何在 SQLAlchemy 中设置 M2M 混合计数属性?
Posted
技术标签:
【中文标题】如何在 SQLAlchemy 中设置 M2M 混合计数属性?【英文标题】:How to set a M2M hybrid count property in SQLAlchemy? 【发布时间】:2022-01-22 20:26:03 【问题描述】:我有两个由 M2M 关系绑定的表。书和作家,作家可以有很多书,书也可以有很多作家。
我想在书籍和作家上都拥有一个count
属性,以便我可以按例如出书最多的作家对它们进行排序。
# many to many association table
book_writer_association_table = Table('book_writer_association',Base.metadata,
Column('book_id',ForeignKey('book.id'), primary_key=True),
Column('Writer',ForeignKey('writer.id'), primary_key=True)
)
class Book(Base):
__tablename__ = 'base'
id = Column(Integer, primary_key=True)
name = Column(String)
writers = relationship(Writer,secondary=book_writer_association_table,back_populates="books")
class Writer(Base):
__tablename__ = 'writer'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship(Book,secondery=book_writer_association_table,back_populates="writers")
@hybrid_property
def book_count(self):
return len(self.books)
@book_count.expression
def book_count(cls):
#what goes here?
我尝试了各种方法,例如详细的here:
class Foo(Base):
__tablename__ = 'foo'
id = Column(Integer, primary_key=True)
bar_id = Column(Integer, ForeignKey('bar.id'))
bar = relationship('Bar')
class Bar(Base):
__tablename__ = 'bar'
id = Column(Integer, primary_key=True)
@hybrid_property
def foo_count(self):
return object_session(self).query(Foo).filter(Foo.bar==self).count()
@foo_count.expression
def foo_count(cls):
return select([func.count(Foo.id)]).where(Foo.bar_id == cls.id).label('foo_count')
但是,在这个例子中,只有两个表,我不确定如何在这里实现更复杂的连接。另一位用户建议使用column_property
,但我在那里遇到了完全相同的问题。我不确定如何进一步将表添加到联接中。
【问题讨论】:
【参考方案1】:您可以自定义从here 到 M2M 案例的想法。为此,您应该在 hybrid_property
中提及 association_table
而不是 Book
表。因此,您消除了与 Book
表的连接,并将您的案例简化为一对多关系。
我想出了这个解决方案。
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String, select, func, create_engine, Table
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, object_session, sessionmaker, Session
# Declare models
@as_declarative()
class Base:
pass
book_writer_association_table = Table('book_writer_association',Base.metadata,
Column('book_id',ForeignKey('book.id'), primary_key=True),
Column('writer_id',ForeignKey('writer.id'), primary_key=True)
)
class Book(Base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True)
name = Column(String)
writers = relationship("Writer", secondary=book_writer_association_table, back_populates="books")
class Writer(Base):
__tablename__ = 'writer'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship("Book", secondary=book_writer_association_table, back_populates="writers")
@hybrid_property
def book_count(self):
return object_session(self).query(book_writer_association_table).filter(book_writer_association_table.c.writer_id == self.id).count()
@book_count.expression
def book_count(cls):
return select([func.count(book_writer_association_table.c.book_id)]).where(book_writer_association_table.c.writer_id == cls.id).label('book_count')
# Load DB schema
engine = create_engine('sqlite:///sqlite3.db')
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=True, bind=engine)
db: Session = SessionLocal()
# Creating test instances
b1 = Book(name="Book 1")
b2 = Book(name="Book 2")
db.add(b1)
db.add(b2)
w1 = Writer(name="Writer 1")
w2 = Writer(name="Writer 2")
db.add(w1)
db.add(w2)
b1.writers.append(w1)
b1.writers.append(w2)
b2.writers.append(w1)
query = db.query(Writer, Writer.book_count)
print(str(query)) # checking query
print()
writers: List[Writer] = query.all() # testing query
for writer, book_count in writers:
print(f"writer.name: book_count")
结果:
> Writer 1: 2
> Writer 2: 1
我不确定如何进一步将表添加到联接中。
db.query(Writer, Writer.book_count)
这里的 SQL 看起来很干净,没有任何连接。所以,我认为你不应该对后续连接有任何问题。
> SELECT writer.id AS writer_id, writer.name AS writer_name, (SELECT count(book_writer_association.book_id) AS count_1
> FROM book_writer_association
> WHERE book_writer_association.writer_id = writer.id) AS book_count
> FROM writer
编辑:如果您需要加入Book
表以提供额外的过滤,您可以这样做。这里我过滤了价格低于 150 的书
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String, select, func, create_engine, Table
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, object_session, sessionmaker, Session
# Declare models
@as_declarative()
class Base:
pass
book_writer_association_table = Table('book_writer_association',Base.metadata,
Column('book_id',ForeignKey('book.id'), primary_key=True),
Column('writer_id',ForeignKey('writer.id'), primary_key=True)
)
class Book(Base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True)
name = Column(String)
price = Column(Integer)
writers = relationship("Writer", secondary=book_writer_association_table, back_populates="books")
class Writer(Base):
__tablename__ = 'writer'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship("Book", secondary=book_writer_association_table, back_populates="writers")
@hybrid_property
def book_count(self):
return (
object_session(self)
.query(book_writer_association_table)
.join(Book, Book.id == book_writer_association_table.c.book_id)
.filter(book_writer_association_table.c.writer_id == self.id)
.filter(Book.price > 150)
.count()
)
@book_count.expression
def book_count(cls):
# return select([func.count(book_writer_association_table.c.book_id)]).where(book_writer_association_table.c.writer_id == cls.id).label('book_count')
#
return (
select([func.count(book_writer_association_table.c.book_id)])
.join(Book, Book.id == book_writer_association_table.c.book_id)
.where(book_writer_association_table.c.writer_id == cls.id)
.filter(Book.price > 150)
.label('book_count')
)
# Load DB schema
engine = create_engine('sqlite:///sqlite3.db')
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=True, bind=engine)
db: Session = SessionLocal()
# Creating test instances
b1 = Book(name="Book 1", price=100)
b2 = Book(name="Book 2", price=200)
db.add(b1)
db.add(b2)
w1 = Writer(name="Writer 1")
w2 = Writer(name="Writer 2")
db.add(w1)
db.add(w2)
b1.writers.append(w1)
b1.writers.append(w2)
b2.writers.append(w1)
query = db.query(Writer, Writer.book_count)
print(str(query)) # checking query
print()
writers: List[Writer] = query.all() # testing query
for writer, book_count in writers:
print(f"writer.name: book_count")
查询:
SELECT writer.id AS writer_id,
writer.name AS writer_name,
(SELECT count(book_writer_association.book_id) AS count_1
FROM book_writer_association
JOIN book ON book.id = book_writer_association.book_id
WHERE book_writer_association.writer_id = writer.id
AND book.price > ?) AS book_count
FROM writer
【讨论】:
谢谢,这是个好主意!但是,如果有不止一个关联表,或者如果您不能只引用关联表,那该怎么办呢?例如,如果不是计算作者拥有的书籍数量,您需要计算其他必须连接到 book 表的东西。例如,如果您希望媒体资源仅统计超过 100 页的图书。 @Curtwagner1984 我已经根据你的情况调整了答案,请检查以上是关于如何在 SQLAlchemy 中设置 M2M 混合计数属性?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 sqlalchemy 在 mysql 中设置 DEFAULT ON UPDATE CURRENT_TIMESTAMP?