python3 PyQt5 SQLite

86717 단어 SQL
Python 3 조작 SQLite
# coding:utf-8

import os
import sqlite3


class SQLiteTools():
    def __init__(self):
        pass

    # region            

    def createConnection(self, DB_name):
        '''         '''
        #     ,       ,        
        self.con = sqlite3.connect(DB_name)
        #        
        self.cur = self.con.cursor()

    def selectTableExist(self, tbname):
        '''
                   
        :param tbname:      
        :return:     True,    False
        '''
        sql = "select * from sqlite_master where type='table' and name = '" + tbname + "';"
        self.cur.execute(sql)
        cursor = self.cur.fetchall()
        if cursor:
            return True
        else:
            return False

    def createSQLtable(self, tbname):
        '''
               ,        ,  :ID,  :INTEGER,   
        :param tbname:      
        '''
        # CREATE TABLE if not exists    (ID INTEGER PRIMARY KEY AUTOINCREMENT);
        sql = u"CREATE TABLE if not exists " + tbname + u" (ID INTEGER PRIMARY KEY AUTOINCREMENT);"
        self.cur.execute(sql)
        self.con.commit()  #           

    # endregion


    # region       

    def addSQLtableColumn(self, tbname, columnName, genre):
        '''
                
        :param tbname:   
        :param columnName:   
        :param genre:      
        '''
        # ALTER TABLE    ADD       ;
        sql = u"ALTER TABLE " + tbname + u" ADD " + columnName + " " + genre +";"
        self.cur.execute(sql)
        self.con.commit()  #           

    def addSQLtableRow(self, tbname, rowNum):
        '''
                  
        :param tbname:     
        :param rowNum:   
        :return: None
        '''
        # INSERT INTO    (ID) VALUES ( );
        rownum = [(i,) for i in range(rowNum)]
        sql = "INSERT INTO " + tbname + " (ID) VALUES (?);"
        self.cur.executemany(sql, rownum)

        # for row in range(1, rowNum+1):
        #     sql = "INSERT INTO " + tbname + " (ID) VALUES (" + str(row) + ");"
        #     self.cur.execute(sql)

        self.con.commit() #           

    def setSQLtableValue(self, tbname, column, row, value):
        '''
                   
        :param tbname:      
        :param row:   
        :param column:   
        :param value:  
        '''
        # UPDATE    SET   =  WHERE ID= ;
        sql = u"UPDATE " + tbname + u" SET " +column+ "='" +value+ "' WHERE ID=" +str(row)+ ";"
        self.cur.execute(sql)
        self.con.commit()  #           

    # endregion


    # region       
    def getSQLtableRowNum(self, tbname):
        '''
                 
        :param tbname:     
        :return:   
        '''
        result = 0
        sql = "SELECT COUNT(*) FROM " + tbname + ";"
        self.cur.execute(sql)
        for value in self.cur:
            return value[0]
        return result

    def getSQLtableColumnName(self, tbname):
        '''
                   
        :param taname:      
        :return:         
        '''
        sql = "PRAGMA table_info([" +tbname+ "])"
        self.cur.execute(sql)
        NameList = []
        for value in self.cur:
            result = value[1]
            if result and result != "ID":
                NameList.append(result)
        return NameList

    def getSQLtableValue(self, tbname, column, row):
        '''
                    
        :param tbname:      
        :param row:     
        :param column:     
        :return        
        '''
        #SELECT    FROM    WHERE ID =   ;
        sql = "SELECT " +column+ " FROM " +tbname+ " WHERE ID=" +str(row)+ ";"
        self.cur.execute(sql)
        for value in self.cur:
            return value[0]

    def getSQLtableColumn(self, tbname, column):
        '''
                     
        :param tbname:      
        :param column:    
        :return          
        '''
        #SELECT    FROM   ;
        sql = "SELECT " +column+ " FROM " +tbname+ ";"
        valueList = []
        self.cur.execute(sql)
        for value in self.cur:
            valueList.append(value[0])
        return valueList

    def getSQLtableRow(self, tbname, row):
        '''
                   
        :param tbname:     
        :param row:   
        :return:    
        '''
        # SELECT * FROM    WHERE ID=  ;
        sql = "SELECT * FROM " + tbname + " WHERE ID="+ str(row) +";"
        self.cur.execute(sql)
        for value in self.cur:
            valueList = list(value)
            valueList.pop(0)
            return valueList

    # endregion


    # region      

    def delSQLtableRow(self, tbname, startRow, rowNum=1):
        '''
                   
        :param tbname:     
        :param rowNum:     
        :param rowNum:   
        '''
        # DELETE FROM    WHERE ID =  ;
        rowList = [(i,) for i in range(startRow, startRow+rowNum)]
        sql = "DELETE FROM " + tbname + " WHERE ID = ?;"
        self.cur.executemany(sql, rowList)

        # for row in range(startRow, startRow + rowNum):
        #     sql = "DELETE FROM " + tbname + " WHERE ID = " + str(row) + ";"
        #     self.cur.execute(sql)

        self.con.commit()  #           

    def delSQLtableColumn(self, tbname, columnName):
        '''
                  
        SQLite        ,         。
        :param tbname:   
        :param columnName:   
        :param genre:      
        '''
        # ALTER TABLE    drop   ;   #SQLite   
        tbName = "tbName0"
        CloumnNameList = self.getSQLtableColumnName(tbname)
        if columnName in CloumnNameList:
            CloumnNameList.remove(columnName)
        CloumnNameList.insert(0, "ID")
        strName = ','.join(CloumnNameList)
        # create table      as select       from    where 1 = 1;
        sql = u"create table "+tbName+" as select "+strName+" from "+tbname+" where 1 = 1"
        self.cur.execute(sql)
        self.con.commit()  #           

        self.delSQLtable(tbname)
        self.removeSQLtableName(tbName, tbname)

    def removeSQLtableName(self, tbname, newTbname):
        '''
             
        :param oldTbName:       
        :param newTbName:       
        :return:  None
        '''
        # alter table       rename to      ;
        sql = u"alter table "+tbname+" rename to "+newTbname+";"
        self.cur.execute(sql)
        self.con.commit()  #           

    def ClearSQLtable(self, tbname):
        '''
               
        :param tbname:   
        '''
        # DELETE FROM table_name WHERE[condition];
        sql = "DELETE FROM " +tbname+ ";"
        self.cur.execute(sql)
        self.con.commit()  #           

    def delSQLtable(self, tbname):
        '''
             
        :param tbname:
        :return: None
        '''
        # drop table record;
        sql = u"drop table "+tbname+";"
        self.cur.execute(sql)
        self.con.commit()  #           

    # endregion


if __name__ == '__main__':
    print("Python SQLite  ")

    class sqliteTest():
        def __init__(self):
            self.DB_File = "./sqlite.db"
            self.tableName = "sqlite"

            #     mysql.ini   ,         
            if os.path.exists(self.DB_File):
                os.remove(self.DB_File)

            self.sqlite = SQLiteTools()
            self.sqlite.createConnection(self.DB_File)

        #    
        def createTable(self):
            if not self.sqlite.selectTableExist(self.tableName):
                self.sqlite.createSQLtable(self.tableName)

        #     
        def addData(self):
            self.sqlite.addSQLtableColumn(self.tableName, "month1", "text")
            self.sqlite.addSQLtableColumn(self.tableName, "month2", "text")
            self.sqlite.addSQLtableColumn(self.tableName, "month3", "text")
            self.sqlite.addSQLtableColumn(self.tableName, "month4", "text")

            self.sqlite.addSQLtableRow(self.tableName, 30)

            self.sqlite.setSQLtableValue(self.tableName, "month1", 0, "1-0")
            self.sqlite.setSQLtableValue(self.tableName, "month1", 1, "1-1")
            self.sqlite.setSQLtableValue(self.tableName, "month2", 2, "2-2")
            self.sqlite.setSQLtableValue(self.tableName, "month2", 3, "2-3")

        #     
        def getData(self):
            value = self.sqlite.getSQLtableRowNum(self.tableName)
            print("      :", value)

            value = self.sqlite.getSQLtableColumnName(self.tableName)
            print("      (   ID):", value)

            value = self.sqlite.getSQLtableValue(self.tableName, "month1", 0)
            print("month1 ,     : ", value)

            value  = self.sqlite.getSQLtableColumn(self.tableName, "month2")
            print("month2     : ", value)

            value = self.sqlite.getSQLtableRow(self.tableName, 1)
            print("       (   ID): ", value)

        #     
        def delData(self):
            value = self.sqlite.getSQLtableRowNum(self.tableName)
            print("         :", value)
            self.sqlite.delSQLtableRow(self.tableName, 10, 5)
            value = self.sqlite.getSQLtableRowNum(self.tableName)
            print("         :", value)

            value = self.sqlite.getSQLtableColumnName(self.tableName)
            print("       (   ID)", value)
            self.sqlite.delSQLtableColumn(self.tableName, "month3")
            value = self.sqlite.getSQLtableColumnName(self.tableName)
            print("       (   ID)", value)

            self.sqlite.ClearSQLtable(self.tableName)
            self.sqlite.delSQLtable(self.tableName)

    win = sqliteTest()  #     ,    
    win.createTable()  #    
    win.addData()  #     
    win.getData()  #     
    win.delData()  #     




PyQt 5 조작 SQLite
# coding:utf-8

import os
from PyQt5.QtSql import QSqlDatabase, QSqlQuery


class SQLiteTools():
    def __init__(self):
        pass

    # region            

    def createConnection(self, DB_name):
        '''
               
        :param DB_name:      
        :return: null
        '''
        #       ,   sqlite3   
        db=QSqlDatabase.addDatabase("QSQLITE")
        #     test0.db,       ,        
        db.setDatabaseName(DB_name)
        #     
        db.open()
        #              
        self.que = QSqlQuery()

    def selectTableExist(self, tbname):
        '''
                   
        :param tbname:     
        :return:    True,    False
        '''
        sql = u"select * from sqlite_master where type='table' and name = '" + tbname + u"';"
        self.que.exec_(sql)
        return self.que.next()

    def createSQLtable(self, tbname):
        '''
               ,        ,  :ID,  :INTEGER,   
        :param tbname:      
        :return:  null
        '''
        # CREATE TABLE if not exists    (ID INTEGER PRIMARY KEY AUTOINCREMENT);
        sql = u"CREATE TABLE if not exists " + tbname + u" (ID INTEGER PRIMARY KEY AUTOINCREMENT);"
        self.que.exec_(sql)

    # endregion


    # region       

    def addSQLtableColumn(self, tbname, columnName, genre):
        '''
                
        :param tbname:   
        :param columnName:   
        :param genre:      
        :return: null
        '''
        # ALTER TABLE    ADD       ;
        sql = u"ALTER TABLE " + tbname + u" ADD " + columnName + " " + genre +";"
        self.que.exec_(sql)

    def addSQLtableRow(self, tbname, rowNum):
        '''
                  
        :param tbname:     
        :param rowNum:   
        :return: null
        '''
        # INSERT INTO    (ID) VALUES ( );
        for row in range(rowNum):
            sql = "INSERT INTO " + tbname + " (ID) VALUES (" + str(row) + ");"
            self.que.exec_(sql)

    def setSQLtableValue(self, tbname, column, row, value):
        '''
                   
        :param tbname:      
        :param column:   
        :param row:   
        :param value:  
        :return: null
        '''
        # UPDATE    SET   =  WHERE ID= ;
        sql = u"UPDATE " + tbname + u" SET " +column+ "='" +value+ "' WHERE ID=" +str(row)+ ";"
        self.que.exec_(sql)

    # endregion


    # region       
    def getSQLtableRowNum(self, tbname):
        '''
                 
        :param tbname:     
        :return:   
        '''
        result = 0
        sql = "SELECT COUNT(*) FROM " + tbname + ";"
        self.que.exec_(sql)
        if self.que.next():
            result = self.que.value(0)
        return result

    def getSQLtableColumnName(self, tbname):
        '''
                   
        :param taname:      
        :return:       
        '''
        sql = "PRAGMA table_info([" +tbname+ "])"
        self.que.exec_(sql)
        NameList = []
        while self.que.next():
            result = self.que.value(1)
            if result and result != "ID":
                NameList.append(result)
        return NameList

    def getSQLtableValue(self, tbname, column, row):
        '''
                    
        :param tbname:      
        :param row:     
        :param column:     
        :return:  
        '''
        #SELECT    FROM    WHERE ID =   ;
        value = 0
        sql = "SELECT " +column+ " FROM " +tbname+ " WHERE ID=" +str(row)+ ";"
        self.que.exec_(sql)
        if self.que.next():
            value = self.que.value(0)
        return value

    def getSQLtableColumn(self, tbname, column):
        '''
                     
        :param tbname:      
        :param column:    
        :return:    
        '''
        #SELECT    FROM   ;
        sql = "SELECT " +column+ " FROM " +tbname+ ";"
        value_list = []
        if self.que.exec_(sql):
            column_index = self.que.record().indexOf(column) #      
            while self.que.next():
                value = self.que.value(column_index)
                value_list.append(value)
        return value_list

    def getSQLtableRow(self, tbname, row):
        '''
                   
        :param tbname:     
        :param row:   
        :return:    
        '''
        # SELECT * FROM    WHERE ID=  ;
        columnNum = len(self.getSQLtableColumnName(tbname))
        sql = "SELECT * FROM " + tbname + " WHERE ID="+ str(row) +";"
        self.que.exec_(sql)
        valueList = []
        while self.que.next():
            for i in range(1, columnNum+1):
                result = self.que.value(i)
                valueList.append(result)
        return valueList

    # endregion


    # region      

    def delSQLtableColumn(self, tbname, columnName):
        '''
                  
        SQLite        ,         。
        :param tbname:   
        :param columnName:   
        :return: null
        '''
        # ALTER TABLE    drop   ;   #SQLite   
        tbName = "tbName0"
        CloumnNameList = self.getSQLtableColumnName(tbname)
        if columnName in CloumnNameList:
            CloumnNameList.remove(columnName)
        CloumnNameList.insert(0, "ID")
        strName = ','.join(CloumnNameList)
        # create table      as select       from    where 1 = 1;
        sql = u"create table "+tbName+" as select "+strName+" from "+tbname+" where 1 = 1"
        self.que.exec_(sql)
        self.delSQLtable(tbname)
        self.removeSQLtableName(tbName, tbname)

    def delSQLtableRow(self, tbname, startRow, rowNum=1):
        '''
                   
        :param tbname:     
        :param startRow:     
        :param rowNum:       
        :return: null
        '''
        # DELETE FROM    WHERE ID =  ;
        for row in range(startRow, startRow + rowNum):
            sql = "DELETE FROM " + tbname + " WHERE ID = " + str(row) + ";"
            self.que.exec_(sql)

    def removeSQLtableName(self, tbname, newTbname):
        '''
             
        :param tbname:       
        :param newTbname:       
        :return:  null
        '''
        # alter table       rename to      ;
        sql = u"alter table "+tbname+" rename to "+newTbname+";"
        self.que.exec_(sql)

    def ClearSQLtableRowValue(self, tbname):
        '''
               
        :param tbname:   
        :return:  null
        '''
        # DELETE FROM table_name WHERE[condition];
        sql = "DELETE FROM " +tbname+ ";"
        self.que.exec_(sql)

    def delSQLtable(self, tbname):
        '''
             
        :param tbname:
        :return: null
        '''
        # drop table record;
        sql = u"drop table "+tbname+";"
        self.que.exec_(sql)

    # endregion


if __name__ == '__main__':
    print("PyQt5 SQLite  ")
    class sqliteTest():
        def __init__(self):
            self.PathFile = "./sqlite.db"
            self.tableName = "sqlite"

            #     mysql.ini   ,         
            if os.path.exists(self.PathFile):
                os.remove(self.PathFile)

            self.sqlite = SQLiteTools()
            self.sqlite.createConnection(self.PathFile)

        #   
        def createTable(self):
            if not self.sqlite.selectTableExist(self.tableName):
                self.sqlite.createSQLtable(self.tableName)

        #     
        def addData(self):
            self.sqlite.addSQLtableColumn(self.tableName, "month1", "text")
            self.sqlite.addSQLtableColumn(self.tableName, "month2", "text")
            self.sqlite.addSQLtableColumn(self.tableName, "month3", "text")
            self.sqlite.addSQLtableColumn(self.tableName, "month4", "text")

            self.sqlite.addSQLtableRow(self.tableName, 30)

            self.sqlite.setSQLtableValue(self.tableName, "month1", 0, "1-0")
            self.sqlite.setSQLtableValue(self.tableName, "month1", 1, "1-1")
            self.sqlite.setSQLtableValue(self.tableName, "month2", 2, "2-2")
            self.sqlite.setSQLtableValue(self.tableName, "month2", 3, "2-3")

        #     
        def getData(self):
            value = self.sqlite.getSQLtableRowNum(self.tableName)
            print("      :", value)

            value = self.sqlite.getSQLtableColumnName(self.tableName)
            print("      (   ID ):", value)

            value = self.sqlite.getSQLtableValue(self.tableName, "month1", 0)
            print("month1 ,     :", value)

            value  = self.sqlite.getSQLtableColumn(self.tableName, "month2")
            print("month2     :", value)

            value = self.sqlite.getSQLtableRow(self.tableName, 2)
            print("       (   ID): ", value)

        #     
        def delData(self):
            value = self.sqlite.getSQLtableRowNum(self.tableName)
            print("         :", value)
            self.sqlite.delSQLtableRow(self.tableName, 10, 5)
            value = self.sqlite.getSQLtableRowNum(self.tableName)
            print("         :", value)

            value = self.sqlite.getSQLtableColumnName(self.tableName)
            print("       (   ID):", value)
            self.sqlite.delSQLtableColumn(self.tableName, "month3")
            value = self.sqlite.getSQLtableColumnName(self.tableName)
            print("       (   ID)", value)
            self.sqlite.ClearSQLtableRowValue(self.tableName)
            self.sqlite.delSQLtable(self.tableName)

    win = sqliteTest()
    win.createTable()
    win.addData()
    win.getData()
    win.delData()



좋은 웹페이지 즐겨찾기