Python3:sqlalchemy对mysql数据库操作,非sql语句

Posted 整合侠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python3:sqlalchemy对mysql数据库操作,非sql语句相关的知识,希望对你有一定的参考价值。

Python3:sqlalchemy对mysql数据库操作,非sql语句

# python3
# author lizm
# datetime 2018-02-01 10:00:00
# -*- coding: utf-8 -*-
‘‘‘
    数据起始日期:2015-05-08
    数据库:mysql
‘‘‘
import requests
from bs4 import BeautifulSoup
import json
import pymysql
import datetime
import time
import sys
import logging
from selenium import webdriver
from sqlalchemy import Column,Integer, String,DateTime,create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import and_,func
import configparser
import math

logger = logging.getLogger()
#set loghandler
file = logging.FileHandler(sys.path[0]+"py_zgjs_log"+time.strftime("%Y%m%d")+".log")
logger.addHandler(file)
#set formater
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
file.setFormatter(formatter) 
#set log level
logger.setLevel(logging.NOTSET)

# 创建对象的基类:
Base = declarative_base()

class Yztzzqktjb(Base):

    # 表名
    __tablename__ = py_zgjs_yztzzqktjb

    # 表结构
    id = Column(Integer,primary_key=True,autoincrement=True)
    mc = Column(String(200),nullable=False)
    begindate = Column(String(45),nullable=False)
    enddate = Column(String(45), nullable=False)
    sjmc = Column(String(200))
    ssjmc = Column(String(200))
    sl = Column(String(45))
    create_time = Column(DateTime,nullable=False)
    update_time = Column(DateTime,nullable=False)

    def __init__(self,mc,begindate,enddate,sjmc,ssjmc,sl,create_time,update_time):
        self.mc = mc
        self.begindate = begindate
        self.enddate = enddate
        self.sjmc = sjmc
        self.ssjmc = ssjmc
        self.sl = sl
        self.create_time = create_time
        self.update_time = update_time

class ZgjsEntry(object):

    def __init__(self, v1, v2,v3,v4,v5,v6):
        self.v1 = v1
        self.v2 = v2
        self.v3 = v3
        self.v4 = v4
        self.v5 = v5
        self.v6 = v6

    def __get__(self, instance, cls):
        if instance is None:
            return self
        else:
            return instance.__dict__[self.name]

    def __set__(self, instance, value):
        instance.__dict__[self.name] = value

    def __delete__(self, instance):
        del instance.__dict__[self.name]

def dbconfig():
    #生成config对象
    cfg = configparser.ConfigParser()
    #用config对象读取配置文件
    path_ = sys.path[0]
    cfg.read(path_+"dbconfig.ini")
    ip = cfg.get("dbserver", "ip")
    port = cfg.get("dbserver", "port")
    user = cfg.get("dbserver", "user")
    password = cfg.get("dbserver", "password")
    dbname = cfg.get("dbserver", "dbname")
    endtime = cfg.get("dbtime", "endtime")
    initdate = cfg.get("dbtime", "mzkbinitdate")
    interval = cfg.get("dbtime", "interval")
    return (ip,port,user,password,dbname,endtime,initdate,interval)

def savrData(tableName,zgjsList):
    msgcode = 0
    message = 数据保存成功
    try:
        dbcfg = dbconfig()
        # 初始化数据库连接,
        # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节
        engine = create_engine(mysql+mysqlconnector://+dbcfg[2]+:+dbcfg[3]+@+dbcfg[0]+:+dbcfg[1]+/+dbcfg[4],encoding=utf-8)
        # 创建DBSession类型:
        DBSession = sessionmaker(bind=engine)
        session = DBSession()
        try:
            # 增操作
            items = []
            if tableName == Yztzzqktjb:
                if len(zgjsList)>0:
                    for i in range(0,len(zgjsList)):
                        results = session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).all()
                        if len(results) > 0:
                            session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).update({Yztzzqktjb.sl: zgjsList[i].v6,Yztzzqktjb.update_time:time.strftime(%Y-%m-%d %H:%M:%S)}, synchronize_session=False)
                        else:
                            item = Yztzzqktjb(mc=zgjsList[i].v1,begindate=zgjsList[i].v2,enddate=zgjsList[i].v3,sjmc=zgjsList[i].v4,ssjmc=zgjsList[i].v5,sl=zgjsList[i].v6,create_time=time.strftime(%Y-%m-%d %H:%M:%S),update_time=time.strftime(%Y-%m-%d %H:%M:%S))
                            items.append(item)
            else:
                pass
            #print("len(items)>>>>>%s" %len(items))
            if len(items) > 0:
                for i in range(0,len(items)):
                    session.add(items[i])
            #提交数据
            session.commit()
        except Exception as e:
            msgcode = 1
            message = 数据保存失败 + str(e)
            session.rollback()
        finally:
            #关闭
            session.close()
    except Exception as e:
        msgcode = 1
        message = 数据库连接失败+str(e)
    logger.info(message)
    print(message)
    return msgcode


def getData(jsDate, channelIdStr,tableName):
    zgjsList = []
    dateStr = jsDate[0:4]+.+jsDate[5:7]+.+jsDate[8:10]
    # 查询按钮跳转url:
    # http://www.******.cn/cms-search/view.action?action=china
    url = "http://www.******.cn/cms-search/view.action?action=china"
    headerDict = {Host: www.*******.cn,
                  User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.31 Safari/537.36,
                  Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,
                  Accept-Language: zh-CN,zh;q=0.8,
                  Accept-Encoding: gzip, deflate,
                  Referer: http://www.******.cn/cms-search/view.action?action=china,
                  Connection: keep-alive}
    data = {dateType: ‘‘, dateStr: dateStr,
            channelIdStr: channelIdStr}   
    # psot 传递参数
    res = requests.post(url, data=data, headers=headerDict)
    # 获取跳转后的页面源码
    soup = BeautifulSoup(res.content, "html.parser")
    #获取周报的起始日期
    SettlementTitle = soup.find(div,class_=SettlementTitle)
    if SettlementTitle is None:
        return zgjsList
    h2 = SettlementTitle.find(h2).text
    if h2 == 搜索结果:
        return zgjsList
    weekdate = h2.strip().split()[1].split()[0]
    begindate = weekdate.split(-)[0].replace(.,-)
    enddate = weekdate.split(-)[1].replace(.,-)

    settlementList = soup.find(id=settlementList)
    # print(settlementList)
    if settlementList is None:
        return zgjsList
    if settlementList.find(table) is None:
        return zgjsList
    
    table_ = settlementList.find(table)
    tr_list = table_.find(table).find_all(tr)
    # 上级名称
    sjmc_1 = ‘‘
    sjmc_2 = ‘‘
    sjmc_3 = ‘‘
    sjmc_4 = ‘‘
    sjmc_5 = ‘‘
    sjmc_6 = ‘‘
    # 上上级名称
    ssjmc_1 = ‘‘
    for n in range(1,len(tr_list)):
        td_list = tr_list[n].find_all(td)
        if tableName == Yztzzqktjb:
            if n == 1:
                sjmc_1 = td_list[0].get_text().replace(一、,‘‘).strip()
            if n == 4:
                sjmc_2 = td_list[0].get_text().replace(二、,‘‘).strip()
                ssjmc_1 = td_list[0].get_text().replace(二、,‘‘).strip()
            if n == 5:
                sjmc_3 = td_list[0].get_text().replace(1、,‘‘).strip()
            if n == 9:
                sjmc_4 = td_list[0].get_text().replace(2、,‘‘).strip()
            if n == 13:
                sjmc_5 = td_list[0].get_text().replace(三、,‘‘).strip()
            if n == 17:
                sjmc_6 = td_list[0].get_text().replace(四、,‘‘).strip()

        if tableName == Yztzzqktjb:
            if n in (6,10,14,18):
                continue
        zgjs = ZgjsEntry(‘‘,‘‘,‘‘,‘‘,‘‘,‘‘)
        zgjs.v2 = begindate
        zgjs.v3 = enddate

        if tableName == Yztzzqktjb:
            # 上级名称
            if n in (2,3):
                zgjs.v4 = sjmc_1
            if n in (5,9):
                zgjs.v4 = sjmc_2
            if n in (7,8):
                zgjs.v4 = sjmc_3
                zgjs.v5 = ssjmc_1
            if n in (11,12):
                zgjs.v4 = sjmc_4
                zgjs.v5 = ssjmc_1
            if n in (15,16):
                zgjs.v4 = sjmc_5
            if n in (19,20):
                zgjs.v4 = sjmc_6
        for i in range(0,len(td_list)):
            if i == 0:
                zgjs.v1 =td_list[i].get_text().replace(一、,‘‘).replace(二、,‘‘).replace(三、,‘‘).replace(四、,‘‘).replace(1、,‘‘).replace(2、,‘‘).strip()
            if i == 1:
                zgjs.v6 =td_list[i].get_text().strip().replace(,,‘‘)
        if zgjs is not None:
            zgjsList.append(zgjs)
    return zgjsList

# 获取开始日期:
def getBeginDate(bgdate,tableName):
    r_date = bgdate
    try:
        dbcfg = dbconfig()
        # 初始化数据库连接,
        # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节
        engine = create_engine(mysql+mysqlconnector://+dbcfg[2]+:+dbcfg[3]+@+dbcfg[0]+:+dbcfg[1]+/+dbcfg[4],encoding=utf-8)
        # 创建DBSession类型:
        DBSession = sessionmaker(bind=engine)
        session = DBSession()
        try:
            if tableName == Yztzzqktjb:
                results = session.query(func.max(Yztzzqktjb.enddate)).all()
                if len(results) != 0:
                    r_date = results[0]
            else:
                pass
        except Exception as e:
            print(获取开始日期,查询异常;%s%str(e))
            logger(获取开始日期,查询异常;%s%str(e))
            session.rollback()
        finally:
            #关闭
            session.close()
    except Exception as e:
        print(获取开始日期,数据库连接失败;%s%str(e))
        logger(获取开始日期,数据库连接失败;%s%str(e))
    if r_date[0] is None:
        r_date = bgdate
    else:
        begin = time.strptime(r_date[0], "%Y-%m-%d")
        y,m,d = begin[0:3]
        r_date =  datetime.date(y,m,d) + datetime.timedelta(days=7)
        r_date = r_date .strftime(%Y-%m-%d)
    return r_date

def isCheckData(date_):
    r_code = 0
    try:
        dbcfg = dbconfig()
        # 初始化数据库连接,
        # 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节
        engine = create_engine(mysql+mysqlconnector://+dbcfg[2]+:+dbcfg[3]+@+dbcfg[0]+:+dbcfg[1]+/+dbcfg[4],encoding=utf-8)
        # 创建DBSession类型:
        DBSession = sessionmaker(bind=engine)
        session = DBSession()
        try:
            #
            results = session.query(Yztzzqktjb).filter(and_(func.datediff(Yztzzqktjb.enddate,date_)<6,func.datediff(Yztzzqktjb.enddate,date_)>-2)).all()
            if len(results) == 0:
                r_code = 1
            else:
                r_code = 0
        except Exception as e:
            r_code = 1
            print(判断是否有数据异常;%s%str(e))
            logger(判断是否有数据异常;%s%str(e))
            session.rollback()
        finally:
            #关闭
            session.close()
    except Exception as e:
        r_code = 1
        print(判断是否有数据,数据库连接异常;%s%str(e))
        logger(判断是否有数据,数据库连接异常;%s%str(e))
    return r_code

# 执行入口
def main(initdate_):
    req_list = [
    {report:6ac54ce22db4474abc234d6edbe53ae7,table:Yztzzqktjb}
    ]
    for req in req_list:
        #字符转日期
        begin = time.strptime(getBeginDate(initdate_,req[table]), "%Y-%m-%d")
        y,m,d = begin[0:3]
        #日期格式:2018-01-18
        begin = datetime.date(y,m,d)
        #获取当前日期
        end = datetime.date.today()
        if (end- begin).days < 0:
            pass
        else:
            for i in range(math.ceil((end - begin).days/7)+1):
                list_szzj = []
                # 日期转字符
                date_ = (begin+datetime.timedelta(days=i*7)).strftime(%Y-%m-%d)
                list_mzkb = getData(date_,req[report],req[table])
                if len(list_mzkb):
                    savrData(req[table],list_mzkb)
                else:
                    pass
                time.sleep(0.5)
                if i % 350 == 0:
                    time.sleep(15)


if __name__ == __main__:
    vrg_date = 20150509
    dbcfg = dbconfig()
    vrg_endtime = dbcfg[5][0:2]+":"+dbcfg[5][2:4]+":"+dbcfg[5][4:6]
    var_initdate = dbcfg[6][0:4]+"-"+dbcfg[6][4:6]+"-"+dbcfg[6][6:8]
    var_interval = int(dbcfg[7])
    
    if len(vrg_date) ==8:
        vrg_date = str(vrg_date[0:4]) + "-" + str(vrg_date[4:6]) + "-" + str(vrg_date[6:8])
        end_time = time.strptime(vrg_endtime, "%H:%M:%S")
        y,m,d = end_time[3:6]
        end_time = datetime.time(y,m,d)
        # 循环采集
        while True: 
            now_time = time.strftime("%H%M%S")
            main(var_initdate)
            if isCheckData(vrg_date,) == 0:
                logger.info("采集数据结束")
                print("采集数据结束")
                break
            # 时间到停止采集
            if int(end_time.strftime(%H%M%S)) - int(now_time) <= 0:
                logger.info("采集数据结束")
                print("采集数据结束")
                break
            # 间隔执行时间
            logger.info("**********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date)
            print("********************(%s):没有采集到数据,任务继续执行**********************" %vrg_date)
            time.sleep(var_interval)
        else:
            logger.info("日期参数格式不正确,请用格式:20180205")
            print("日期参数格式不正确,请用格式:20180205")
        

 

以上是关于Python3:sqlalchemy对mysql数据库操作,非sql语句的主要内容,如果未能解决你的问题,请参考以下文章

Python3下sqlalchemy对mysql数据库的配置

在python3下怎样用flask-sqlalchemy对mysql数据库操作

在python3下怎样用flask-sqlalchemy对mysql数据库操作

在python3下怎样用flask-sqlalchemy对mysql数据库操作

在python3下怎样用flask-sqlalchemy对mysql数据库操作

在python3下怎样用flask-sqlalchemy对mysql数据库操作