VastbaseG100

基于openGauss内核开发的企业级关系型数据库。

Menu

执行SQL语句并处理结果

创建或修改数据库对象

在使用SQLAlchemy执行数据库操作前需要先建立表的类对象,因为在SQLAlchemy中,一张表对应一个类。用户可以自己创建表,让SQLAlchemy访问这些表。

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

from sqlalchemy import Column,Integer,String

# 创建表模型
class User(Base):
    __tablename__ = 'user'
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32))
    address = Column(String(64))

    def __repr__(self):
        return "<User(id='%s', name='%s', address='%s')>" % (
self.id,self.name,self.address)

Base.metadata.create_all(engine)

createTable()可以指定参数ifNotExists=True,仅仅在表格不存在时才创建这个表格。

删除数据库对象

删除数据库对象时使用drop_all()函数,此方法以及create_all()方法接受可选参数tables,它使用的迭代器是sqlalchemy.sql.schema.Table实例。可以通过这种方式控制要创建或删除的表。

Base.metadata.drop_all(bind=engine,tables=[User.__table__])

执行更新

插入

# 插入单条数据
u = User(name='用户1',address='地址1')
session.add(u)``

# 插入多条数据
session.add_all([
    User(name='用户2',address='地址2'),
    User(name='用户3',address='地址3'),
])

# 提交数据,保存到数据库中
session.commit()

更新

# 更新单条数据
res = session.query(User).filter(User.id==3).update({"name":"新用户"})
session.commit()

# 更新多条数据,返回更新行数
res2 = session.query(User).filter(User.id<=2).update({"name":"修改用户"})
print(res2)  

session.commit()

删除

res = session.query(User).filter(User.id==2).delete()

session.commit()

执行查询

简单查询

查询数据使用相关query函数查询:

# 查询所有用户
user_all = session.query(User).all()

# where条件查询
user = session.query(User).filter(User.id>=2).all()

# 只取第一条数据
user = session.query(User).filter(User.id>=2).first()

复杂条件查询

# and_, or_ 条件查询
from sqlalchemy.sql import and_,or_
r1 = session.query(User).filter(and_(User.id>3,User.name=='用户4')).all()
r1 = session.query(User).filter(or_(User.id<2,User.name=='新用户')).all()

# 查询数据,指定查询数据加入别名
r2 = session.query(User.name.label('username'),User.id).firset()
print(r2.id,r2.username)

# 原生sql查询
r3 = session.query(User).from_statement(text("select * from User where name=:name")).params(name='修改用户').all()
  print(r3)

获取结果

query返回的是list类型,可以调用每一个值的属性输出结果。

or i in r3:
print(i.id,i.name,i.address)

执行存储过程

  • 下面的例子展示了如何调用存储过程。

    connection = engine.raw_connection()
    cursor = connection.cursor()
    sql_procedure = " CREATE OR REPLACE PROCEDURE pro_test(a integer,inout b integer) as \
                      begin \
                          b := a+b; \
                      end"
    cursor.execute(sql_procedure)
    cursor.execute("call pro_test(1,5)")
    result = cursor.fetchone()
    
  • 下面的例子展示了如何调用函数。

    connection = engine.raw_connection()
    cursor = connection.cursor()
    sql_function = "CREATE OR REPLACE FUNCTION func_test (a int,out b int) RETURNS int AS \
                    $$ \
                    begin \
                        b := a+2; \
                    return ;  \
                    end; \
                    $$ LANGUAGE 'plpgsql';"
    cursor.execute(sql_function)
    cursor.execute("select func_test(1)")
    result = cursor.fetchone()
    

处理数据类型

二进制类型

Vastbase G100提供两种不同的方法存储二进制数据。 二进制数据可以使用二进制数据类型 BYTEA存储在表中,或者使用大对象特性以一种特殊的格式将二进制数据存储在一个独立的表中,然后通过在表中保存一个类型为 OID 的值来引用该表。

以下为BYTEA类型使用示例。

1、创建表:

from sqlalchemy import Column,Integer,LargeBinary
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class ByteaTest(Base):
__tablename__ = 'bytetest'
id = Column(Integer,primary_key=True,autoincrement=True)
col1 = Column(LargeBinary())# bytea类型

def __repr__(self):
return "<bytetest(col1='%s')>" %(self.col1)

Base.metadata.create_all(engine)

2、插入数据。

data = bytes(range(256))
u = ByteaTest(col1=data)
session.add(u)
session.commit()

3、读取数据。

b1 = session.query(ByteaTest).filter(ByteaTest.id==1).first()
print(b1.id,b1.col1)
assert b1.col1==data

字符类型

Vastbase G100中TEXT类型与VARCHAR类型都是可变长的字符类型,区别在于VARCHAR类型通过VARCHAR(n)中的n来限制最大长度,而TEXT类型没有。

TEXT类型与VARCHAR类型几乎没有性能差别,TEXT类型最多可存储1G数据 。

1、创建表。

from sqlalchemy import Column,String,Text,Integer
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class StringTest(Base):
__tablename__ = 'stringtest'
id = Column(Integer,primary_key=True,autoincrement=True)
col1 = Column(Text())# text类型
col2 = Column(String(256))# varchar类型

def __repr__(self):
return "<stringtest(col1='%s', col2='%s')>" %(self.col1,self.col2)

Base.metadata.create_all(engine)

2、插入数据。

a='test1'
b='test2'
u = StringTest(col1=a,col2=b)
session.add(u)
session.commit()

3、读取数据。

s1 = session.query(StringTest).filter(StringTest.id==1).first()
assert s1.col1 == a
assert s1.col2 == b
print(s1.id,s1.col1,s1.col2)

数字类型

数字类型有整数类型、任意精度数字类型、浮点类型和序数类型。下面用整数类型integer、任意精度数字类型numeric和double precision举例说明。

1、创建表。

from sqlalchemy import Column,Integer,Numeric
from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class NumberTest(Base):
__tablename__ = 'numbertest'
id = Column(Integer,primary_key=True,autoincrement=true)
col1 = Column(Integer)# integer类型
col2 = Column(Numeric(precision=6,scale=4)# numeric类型
col3 = Column(DOUBLE_PRECISION())# double precision类型

def __repr__(self):
return "<NumberTest(col1='%s', col2='%s', col3='%s')>" 
%(self.col1,self.col2,self.col3)

Base.metadata.create_all(engine)

2、插入数据。

a = 1
b= decimal.Decimal(2.141)
c = 3.1415926
u = NumberTest(col1=a,col2=b,col3=c)
session.add(u)
session.commit()

3、读取数据。

n1=session.query(NumberTest).filter(NumberTest.id==1).first()
print(n1.col1)
print(n1.col2)
print(n1.col3)

时间/日期类型

日期/时间类型有timestamp 、date、time 、interval。

下面以timestamp 、date、time 举例说明。

1、创建表。

from sqlalchemy import Column,Integer,Date,DateTime,Time
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class DatetimeTest(Base):
__tablename__ = 'datetimetest'
id = Column(Integer,primary_key=True,autoincrement=true)
col1 = Column(DateTime())# timestamp类型
col2 = Column(Date())# date类型
col3 = Column(Time())# time类型

def __repr__(self):
return "<DatetimeTest(col1='%s', col2='%s', col3='%s')>" 
%(self.col1,self.col2,self.col3)

Base.metadata.create_all(engine)

2、插入数据。

_now = datetime.now().replace(microsecond = 0)
u = DateTime1(col1=_now,col2=_now,col3=_now.time())
session.add(u)
session.commit()

3、读取数据。

d1=session.query(DatetimeTest).filter(DatetimeTest.id==1).first()
print(d1.id,d1.col1,d1.col2,d1.col3)
assert isinstance(d1.col1,datetime)
assert isinstance(d1.col2,date)
assert isinstance(d1.col3,time)