附录2:调用示例
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String
from sqlalchemy.sql import and_,or_
connstr = 'postgresql+psycopg2://vbadmin:password@127.0.0.1/vastbase'
engine = create_engine(connstr,echo=False)
DBSession = sessionmaker(bind=engine)
session = DBSession()
Base = declarative_base()
# 创建表模型
class User(Base):
__tablename__='users'
id = Column(Integer,primary_key=True,autoincrement=True)
name = Column(String(32))
address = Column(String(64))
def __repr__(self):
return "<User(id='%s', name='%s', address='%s')>"
%(self.id,self.name,self.address)
def drop_t():
Base.metadata.drop_all(bind=engine,tables=[User.__table__])
def add_t():
# 插入单条数据
u=User(name='用户1',address='地址1')
session.add(u)
# 插入多条数据
session.add_all([
User(name='用户2',address='地址2'),
User(name='用户3',address='地址3'),
])
# 提交数据,保存到数据库中
session.commit()
def update_t():
res = session.query(User).filter(User.id==3).update({"name":"新用户"})
session.commit()
res2 = session.query(User).filter(User.id<=2).update({"name":"修改用户"})
print(res2)
session.commit()
def select_t():
# 查询所有用户
user_all = session.query(User).all()
print(user_all)
# where条件查询
user = session.query(User).filter(User.id>=2).all()
# 只取第一条数据
user = session.query(User).filter(User.id>=2).first()
print(user)
# and_,or_条件查询
r1 = session.query(User).filter(and_(User.id>3,User.name=='用户4')).all()
print(r1)
r2 = session.query(User).filter(or_(User.id<2,User.name=='新用户')).all()
print(r2)
# 查询数据,指定查询数据加入别名
r3 = session.query(User.name.label('username'),User.id).first()
print(r3.id,r3.username)
# 原生sql查询
r4 = session.query(User).from_statement(text("select * from users where
name=:name")).params(name='修改用户').all()
for i in r4:
print(i.id,i.name,i.address)
def delete_t():
res = session.query(User).filter(User.id==2).delete()
session.commit()
Base.metadata.create_all(engine)
add_t()
update_t()
select_t()
delete_t()
drop_t()