MySQL大表归档

BASH脚本定期处理

Posted by BY lingjun on April 25, 2019 本文总阅读量

Mysql大表归档

Bash脚本HisDataArch.sh

#!/bin/bash
. ~/.mpd
btime=`date +"%Y-%m-%d %H:%M:%S"`
dataDir='/data/backup/'
lastMonth=`date -d "1 month ago" +%m`
pathName=$1_$lastMonth
endDate="`date -d "3 month ago" +%Y-%m`-01"
tableList=("t_billc_bill_details" "t_billc_trade_info" "t_billc_his_bill" "t_billc_pay_wechat")
 
echo .................................... Start  $btime ....................................
 
freeDiskSize=$(df /| awk '/\//{print$3}')
if [[  $freeDiskSize -le 10240000 ]]; then
    echo '磁盘空间小于10G,无法运行'
    exit 1
else
    if echo "${tableList[@]}" | grep -w "$1" &>/dev/null; then
            if [[ ! -d ${dataDir}${pathName} ]]; then
                            mkdir -p ${dataDir}${pathName}
            else
                            mv ${dataDir}${pathName} ${dataDir}${pathName}_bak$btime
                            mkdir ${dataDir}${pathName}
            fi

            if [[ $1 = "t_billc_his_bill" ]]; then
                    echo $1
                    pt-archiver --source h=$dbhost,u=$dbuser,p=$password,D=hc_bill_center,t=$1,i=index_bill_date --file ${dataDir}${pathName}'/%D.%t-%Y-%m-%d' \
                            --no-version-check --charset=UTF8 --where="bill_date<'$endDate'" --ignore --txn-size=1000 --limit=1000 --bulk-delete \
                            --progress=5000 --statistics --why-quit
                    cd $dataDir
                    tar -cvf $pathName.tar $pathName
                    rm -rf $pathName
                    #scp $pathName.tar $tarUser@$tarHost:$tarPath
                    pt-online-schema-change --user=$dbuser --password=$password --defaults-file=/etc/my.cnf --set-vars='sql_log_bin=0' --charset=utf8 --alter "ENGINE=InnoDB" D=hc_bill_center,t=$1 --execute --print --no-check-alter
            else
                    echo $1
                    pt-archiver --source h=$dbhost,u=$dbuser,p=$password,D=hc_bill_center,t=$1,i=idx_bill_date --file ${dataDir}${pathName}'/%D.%t-%Y-%m-%d' \
                            --no-version-check --charset=UTF8 --where="bill_date<'$endDate'" --ignore --txn-size=1000 --limit=1000 --bulk-delete \
                            --progress=5000 --statistics --why-quit
                    cd $dataDir
                    tar -cvf $pathName.tar $pathName
                    rm -rf $pathName
                    #scp $pathName.tar $tarUser@$tarHost:$tarPath
                    pt-online-schema-change --user=$dbuser --password=$password --defaults-file=/etc/my.cnf --set-vars='sql_log_bin=0' --charset=utf8 --alter "ENGINE=InnoDB" D=hc_bill_center,t=$1 --execute --print --no-check-alter
            fi
    else
            echo "请输入正确的参数值"
            exit 1
    fi
fi
echo .................................... End  $btime ....................................
部署crontab定时执行即可
0 3 1 * * /bin/bash /data/scripts/HisDataArch.sh t_billc_bill_details >>/data/scripts/logs/HisDataArch.log
0 3 2 * * /bin/bash /data/scripts/HisDataArch.sh t_billc_trade_info >>/data/scripts/logs/HisDataArch.log
0 3 3 * * /bin/bash /data/scripts/HisDataArch.sh t_billc_his_bill >>/data/scripts/logs/HisDataArch.log
0 3 4 * * /bin/bash /data/scripts/HisDataArch.sh t_billc_pay_wechat >>/data/scripts/logs/HisDataArch.log

Python脚本HisDataArch.py

#!/usr/bin/env python  
# -*- coding: utf-8 -*-  
# Date: 2019/9/5/  
# Created by 灵均  
import MySQLdb
import sys
import base64
import commands
import os
import logging
from datetime import datetime
from dateutil.relativedelta import relativedelta
reload(sys)
sys.setdefaultencoding('utf8')
 
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S',
                    filename='/data/scripts/logs/his_data_arch.log',
                    filemode='a')
logger = logging.getLogger(__name__)
 
 
def connDatabase(db_host):
    conn=MySQLdb.connect(host=db_host,user='dba',passwd=base64.decodestring('**********'),
                         port=3306,db='hc_bill_center',charset='utf8')
    return conn
 
def outFile(endDate,endMonth,table):
    outFileSql='''select * from %s where bill_date<\'%s\' into outfile "/var/lib/mysql-files/%s_%s.sql" fields terminated by '\\t';'''\
        %(table,endDate,table,endMonth)
    conn=connDatabase('localhost')
    cur = conn.cursor()
    cur.execute(outFileSql)
    cur.close()
    conn.close()
 
def scpData(table,endMonth,tarHost):
    scpCmd="scp /var/lib/mysql-files/%s_%s.sql root@%s:/data/archdata/hc_bill_center/"%(table,endMonth,tarHost)
    msg=commands.getstatusoutput(scpCmd)
    return msg
 
def rmFile(table,endMonth):
    rmCmd='''rm -rf /var/lib/mysql-files/%s_%s.sql'''%(table,endMonth)
    msg=commands.getstatusoutput(rmCmd)
    return msg
 
def deleteData(endDate,table):
    if table == 't_billc_his_bill':
        delCmd='''/usr/bin/pt-archiver --source h=localhost,u=dba,p=\'%s\',D=hc_bill_center,t=%s,i=index_bill_date \
        --purge --no-version-check --charset=UTF8 --where="bill_date<\'%s\'" --ignore --txn-size=1000 --limit=1000 \
        --bulk-delete --progress=5000 --statistics --why-quit'''%(base64.decodestring('TXlzcWxfNzg5IQ=='),table,endDate)
        msg=commands.getstatusoutput(delCmd)
        return msg
    else:
        delCmd='''/usr/bin/pt-archiver --source h=localhost,u=dba,p=\'%s\',D=hc_bill_center,t=%s,i=idx_bill_date \
        --purge --no-version-check --charset=UTF8 --where="bill_date<\'%s\'" --ignore --txn-size=1000 --limit=1000 \
        --bulk-delete --progress=5000 --statistics --why-quit'''%(base64.decodestring('TXlzcWxfNzg5IQ=='),table,endDate)
        msg=commands.getstatusoutput(delCmd)
        return msg
 
def analyzeTable(table):
    analyzeCmd='''/usr/bin/pt-online-schema-change --user=dba --password=%s --defaults-file=/etc/my.cnf \
    --charset=utf8 --alter "ENGINE=InnoDB" D=hc_bill_center,t=%s --execute --print \
    --no-check-alter'''%(base64.decodestring('TXlzcWxfNzg5IQ=='),table)
    msg=commands.getstatusoutput(analyzeCmd)
    return msg
 
def getFileSize(table,endMonth):
    filePath='/var/lib/mysql-files/%s_%s.sql'%(table,endMonth)
    filePath = unicode(filePath,'utf8')
    fsize = os.path.getsize(filePath)
    # fsize = fsize/float(1024*1024)
    return round(fsize,2)
 
def main():
    logger.info('.................................... Start ....................................')
    tableList=['t_billc_bill_details','t_billc_trade_info','t_billc_his_bill','t_billc_pay_wechat']
    tarHost='172.16.5.50'
    tarPath='/data/archdata/hc_bill_center/'
    endDate=(datetime.now().replace(day=1,hour=0,minute=0,second=0) - relativedelta(months=+3)).strftime('%Y-%m-%d %H:%M:%S')
    endMonth=(datetime.now()- relativedelta(months=+3)).strftime('%Y%m')
    for table in tableList:
        st = os.statvfs('/')
        freeDisk= st.f_bavail * st.f_frsize/1024/1024/1024
        if freeDisk>20:
            logger.info('正在导出文件...')
            outFile(endDate,endMonth,table)
            if getFileSize(table,endMonth)>0:
                logger.info('正在传输数据文件到归档库...')
                scpStatus = scpData(table,endMonth,tarHost,tarPath)
                if scpStatus[0] == 0:
                   rmStatus = rmFile(table,endMonth)
                   if rmStatus[0] == 0:
                       logger.info('已删除本地数据文件')
                       logger.info('正在删除需要归档的数据...')
                       deleteStatus = deleteData(endDate,table)
                       if  deleteStatus[0] == 0:
                           logger.info('已归档的数据已删除,正在分析表...')
                           analyzeStatus= analyzeTable(table)
                           if analyzeStatus[0] == 0:
                               logger.info('本次数据归档已完成')
                           else:
                               logger.error(analyzeStatus[1])
                       else:
                           logger.error(deleteStatus[1])
                   else:
                       logger.error(rmStatus[1])
                else:
                    logger.error(scpStatus[1])
            else:
                logger.error('文件导出失败')
        else:
            logger.warning('磁盘空间小于20G,请清理空间再进行操作...')
    logger.info('.................................... End ....................................')
 
if __name__ == "__main__":
    main()
  • 备注:脚本执行过程会产生大量binlog日志,需要确保有一定的磁盘空间