IE盒子

搜索
查看: 126|回复: 1

Mysql_简易操作工具类

[复制链接]

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
发表于 2022-12-30 16:33:50 | 显示全部楼层 |阅读模式
数据表[data_test]结构:
CREATE TABLE `data_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `datestr` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
  `count` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;工具类:
# coding:utf-8
# @Time    : 2022/9/23 10:47
# @Author  : KeenLeung
# @Remark  :

import arrow
import pandas as pd
from sqlalchemy import create_engine

class DBHandleTool(object):
    """
    数据库操作类
    SQLAlchemy==1.4.22
    """
    def __init__(self, db_config):
        self.db_config = db_config
        mysql_url = "mysql://{user}:{password}@{hostname}:{port}/{dbname}?charset=utf8".format(**self.db_config)
        self.engine = create_engine(mysql_url, encoding='utf-8')

    # ================================================== 原生sql ==================================================

    def execute(self, sql):
        """
        执行sql
        :param sql:
        :return:
        """
        with self.engine.begin() as conn:
            conn.execute(sql)

    def fetchcolumn(self, sql):
        """
        获取某一个值
        :param sql:
        :return:
        """
        with self.engine.begin() as conn:
            result = conn.execute(sql)
            result = result.fetchone()
            if result:
                return result[0]
            else:
                return None

    def fetchone(self, sql):
        """
        获取一行数据
        :param sql:
        :return:
        """
        with self.engine.begin() as conn:
            result = conn.execute(sql)
            result = result.fetchone()
            return result

    def fetchall(self, sql):
        """
        获取多行数据
        :param sql:
        :return:
        """
        with self.engine.begin() as conn:
            result = conn.execute(sql)
            result = result.fetchall()
            return result

    # ================================================== 非原生sql ==================================================
    def __correct_table__(self, table):
        """
        获取正确的table名称
        :param table:表名称
        :return:
        """
        if ',' in table:
            return table.split(',')[0]
        elif ';' in table:
            return table.split(';')[0]
        else:
            return table

    def __build_query_condition__(self, filter_dict):
        """
        构建查询条件
        :param filter_dict: 查询条件(dict),比如:{"datestr": "2022-09-16"}
        :return:
        """
        keys = list(filter_dict.keys())
        values = list(filter_dict.values())
        placestr = ""
        if keys:
            placestr = map(lambda x: x + '=%s', keys)
            placestr = ' and '.join(placestr)
            placestr = f"where {placestr}"
        params = tuple(values)
        return placestr, params

    def __build_update_condition__(self, value_dict, filter_dict):
        """
        构建更新语句条件
        :param filter_dict:
        :return:
        """
        keys1 = list(value_dict.keys())
        values1 = list(value_dict.values())
        placestr1 = map(lambda x: x + ' = %s', keys1)
        placestr1 = ', '.join(placestr1)

        keys2 = list(filter_dict.keys())
        values2 = list(filter_dict.values())
        placestr2 = ""
        if keys2:
            placestr2 = map(lambda x: x + ' = %s', keys2)
            placestr2 = ' and '.join(placestr2)
            placestr2 = f"where {placestr2}"

        values = values1 + values2
        params = tuple(values)

        return placestr1, placestr2, params

    def row_exists(self, table, filter_dict={}):
        """
        数据行是否存在
        :param table:       表名称
        :param filter_dict: 查询条件(dict),比如:{"datestr": "2022-09-16"}
        :return:
        """
        table = self.__correct_table__(table)
        placestr, params = self.__build_query_condition__(filter_dict)
        sql = f"select * from {table} {placestr}"
        with self.engine.begin() as conn:
            result = conn.execute(sql, params)
            result = result.fetchone()
            if result:
                return True
            else:
                return False

    def insert_row(self, table, value_dict):
        """
        插入数据
        :param table:       表名称
        :param value_dict:  字段、数值数据字典(dict)
        :return:
        """
        table = self.__correct_table__(table)
        keys = list(value_dict.keys())
        values = list(value_dict.values())
        fields_str = ','.join(keys)
        placestr = ['%s' for x in values]
        placestr = ','.join(placestr)
        params = tuple(values)
        sql = f"insert into {table}({fields_str}) values({placestr})"
        with self.engine.begin() as conn:
            conn.execute(sql, params)

    def insert_many(self, table, value_dicts):
        """
        插入多行数据
        :param table:       表名称
        :param value_dicts: 字段、数值数据字典列表(list)
        :return:
        """
        table = self.__correct_table__(table)
        keys = list(value_dicts[0].keys())
        params = [tuple(x.values()) for x in value_dicts]
        fields_str = ','.join(keys)
        placestr = ['%s' for x in keys]
        placestr = ','.join(placestr)
        sql = f"insert into {table}({fields_str}) values({placestr})"
        with self.engine.begin() as conn:
            conn.execute(sql, params)

    def delete(self, table, filter_dict={}):
        """
        删除数据
        :param table:       数据表
        :param filter_dict: 查询条件(dict)
        :return:
        """
        table = self.__correct_table__(table)
        placestr, params = self.__build_query_condition__(filter_dict)
        sql = f"delete from {table} {placestr}"
        with self.engine.begin() as conn:
            conn.execute(sql, params)

    def count(self, table, filter_dict={}, column=None):
        """
        查询条数
        :param table:       数据表
        :param filter_dict: 查询条件(dict)
        :param column:      指定列名(str)
        :return:
        """
        if not column:
            column = "*"
        else:
            column = str(column)
        table = self.__correct_table__(table)
        placestr, params = self.__build_query_condition__(filter_dict)
        sql = f"select count({column}) from {table} {placestr}"
        with self.engine.begin() as conn:
            result = conn.execute(sql, params)
            result = result.fetchone()
            return result[0]

    def get_column(self, table, column, filter_dict={}):
        """
        获取某一列的数值
        :param table:       数据表
        :param column:      指定列名(str)
        :param filter_dict: 查询条件(dict)
        :return:
        """
        table = self.__correct_table__(table)
        placestr, params = self.__build_query_condition__(filter_dict)
        sql = f"select {column} from {table} {placestr}"
        with self.engine.begin() as conn:
            result = conn.execute(sql, params)
            result = result.fetchone()
            return result[0]

    def update(self, table, value_dict, filter_dict={}):
        """
        更新
        :param table:       数据表
        :param value_dict:  更新值数据字典(dict)
        :param filter_dict: 查询条件(dict)
        :return:
        """
        table = self.__correct_table__(table)
        placestr1, placestr2, params = self.__build_update_condition__(value_dict, filter_dict)
        sql = f"update {table} set {placestr1} {placestr2}"
        with self.engine.begin() as conn:
            conn.execute(sql, params)

    def set_column_zero(self, table, column, filter_dict={}):
        """
        设置某一列的值为0
        :param table:       数据表
        :param column:      列名称
        :param filter_dict: 查询条件(dict)
        :return:
        """
        value_dict = {
            column: 0
        }
        self.update(
            table=table,
            value_dict=value_dict,
            filter_dict=filter_dict
        )

    def clear(self, table):
        """
        清空数据表
        :param table:数据表
        :return:
        """
        table = self.__correct_table__(table)
        sql = "truncate table %s" % table
        with self.engine.begin() as conn:
            conn.execute(sql)

    # ================================================== DataFrame ==================================================
    def get_df(self, sql):
        """
        获取DataFrame结果
        :param sql:
        :return:
        """
        with self.engine.begin() as conn:
            df = pd.read_sql(sql, con=conn)
            return df

    def save_df(self, df, tablename, if_exists='append', index=False):
            """
            保存DataFrame到数据库
            :param df:          要保存的DataFrame数据
            :param tablename:   表名称
            :param if_exists:
            :param index:
            :return:
            """
            with self.engine.begin() as conn:
                df.to_sql(name=tablename, con=conn, if_exists=if_exists, index=index)
                return True案例:
if __name__ == '__main__':
    db_config = {
        "user": "***", #用户名
        "password": "***", #密码
        "hostname": "***.***.***.***", #ip地址
        "port": "3306", #端口
        "dbname": "***" #数据库名称
    }
    db = DBHandleTool(db_config=db_config)
    todaystr = arrow.now().format('YYYY-MM-DD')
    tb_name = 'data_test'

    # ================================================== 原生sql ==================================================

    # 插入
    db.execute(f"insert into {tb_name}(`datestr`, `count`) values('{todaystr}', 1)")

    # 查询
    # 某列数据
    result = db.fetchcolumn(f"select count from {tb_name} where datestr = '{todaystr}'")
    print(result)

    # 某行
    row = db.fetchone(f"select * from {tb_name} where datestr = '{todaystr}'")
    print(row)

    # 所有行
    rows = db.fetchall(f"select * from {tb_name} where datestr = '{todaystr}'")
    print(rows)

    # ================================================== 非原生 ==================================================

    # 行是否存在
    exists = db.row_exists(
        table=tb_name,
        filter_dict={"datestr": todaystr}
    )
    print(exists)

    # 插入一行
    db.insert_row(
        table=tb_name,
        value_dict={"datestr": arrow.get(todaystr).shift(days=+1).format('YYYY-MM-DD'), "count": 2}
    )

    # 插入多行
    db.insert_many(
        table=tb_name,
        value_dicts=[
            {"datestr": arrow.get(todaystr).shift(days=+2).format('YYYY-MM-DD'), "count": 3},
            {"datestr": arrow.get(todaystr).shift(days=+3).format('YYYY-MM-DD'), "count": 4},
        ]
    )

    # 删除数据
    db.delete(
        table=tb_name,
        filter_dict={"datestr": arrow.get(todaystr).shift(days=+3).format('YYYY-MM-DD')}
    )

    # 查询条数
    row_nums = db.count(
        table=tb_name
    )
    print(row_nums)

    # 获取某列的值
    count_num = db.get_column(
        table=tb_name,
        column="count",
        filter_dict={"datestr": arrow.get(todaystr).shift(days=+2).format('YYYY-MM-DD')},
    )
    print(count_num)

    # 更新数据
    db.update(
        table=tb_name,
        value_dict={"count": 4},
        filter_dict={"datestr": arrow.get(todaystr).shift(days=+2).format('YYYY-MM-DD')}
    )

    # ================================================== DataFrame ==================================================
    # 获取df
    df = db.get_df(f"select * from {tb_name}")
    print(df)测试通过!
回复

使用道具 举报

4

主题

9

帖子

19

积分

新手上路

Rank: 1

积分
19
发表于 2025-4-8 12:25:37 | 显示全部楼层
秀起来~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表