當前位置:首頁 » 編程語言 » python解析sql
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

python解析sql

發布時間: 2022-10-18 22:44:11

① python如何操作sql語句

這里有個比較清楚的解答:
http://..com/question/262503775.html
但是你的是一個文本的話,就要稍微改一下咯
如果改成cx_Oracle的話,就是這樣的:
import
sys
import
cx_Oracle
import
os
class
handleDataBase:
def
__init__(self,user,passwd,server,sql):
self.user=user
self.passwd=passwd
self.server=server
self.sql=sql
self.conn
=
cx_Oracle.connect("%s/%s@%s"%(self.user,self.passwd,self.server))
def
selectDB(self):
cursor
=
self.conn.cursor()
cursor.execute("select
count(1)
from
search_item_08")
ret
=
cursor.fetchall()
cursor.close()
print
ret
return
ret
def
closeDB(self):
self.conn.close()
if
__name__
==
"__main__":
if
len(sys.argv)
<
4:
print
"Need
Arguments:
user
passwd
server"
sys.exit(1)
user=sys.argv[1]
passwd=sys.argv[2]
server=sys.argv[3]
#sql='select
count(1)
from
search_item_08;'
#注意這里要改
sql
=
open('a.sql','r').read()
#改成從文件讀取
#接下來就訪問資料庫
handleDB
=
handleDataBase(user,passwd,server,sql)
handleDB.selectDB()
handleDB.closeDB()

② python對資料庫表格裡面的內容增刪查改怎麼寫

本文主要給大家介紹了關於python模擬sql語句對員工表格進行增刪改查的相關內容,分享出來供大家參考學習,下面來一起看看詳細的介紹:
具體需求:
員工信息表程序,實現增刪改查操作:
可進行模糊查詢,語法支持下面3種:
select name,age from staff_data where age > 22 多個查詢參數name,age 用','分割
select * from staff_data where dept = 人事
select * from staff_data where enroll_date like 2013
查到的信息,列印後,最後面還要顯示查到的條數
可創建新員工紀錄,以phone做唯一鍵,phone存在即提示,staff_id需自增,添加多個記錄record1/record2中間用'/'分割
insert into staff_data values record1/record2
可刪除指定員工信息紀錄,輸入員工id,即可刪除
delete from staff_data where staff_id>=5andstaff_id<=10
可修改員工信息,語法如下:
update staff_table set dept=Market,phone=13566677787 where dept = 運維 多個set值用','分割
使用re模塊,os模塊,充分使用函數精簡代碼,熟練使用 str.split()來解析格式化字元串
由於,sql命令中的幾個關鍵字元串有一定規律,只出現一次,並且有順序!!!
按照key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit']的元素順序分割sql.
分割元素作為sql_dic字典的key放進字典中.分割後的列表為b,如果len(b)>1,說明sql字元串中含有分割元素,同時b[0]對應上一個分割元素的值,b[-1]為下一次分割對象!
這樣不斷迭代直到把sql按出現的所有分割元素分割完畢,但注意這里每次循環都是先分割後賦值!!!當前分割元素比如'select'對應的值,需要等到下一個分割元素
比如'from'執行分割後的列表b,其中b[0]的值才會賦值給sql_dic['select'] ,所以最後一個分割元素的值,不能通過上述循環來完成,必須先處理可能是最後一個分割元素,再正常循環!!
在這sql語句中,有可能成為最後一個分割元素的 'limit' ,'values', 'where', 按優先順序別,先處理'limit' ,再處理'values'或 'where'.....
處理完得到sql_dic後,就是你按不同命令執行,對數據文件的增刪改查,最後返回處理結果!!
示例代碼# _*_coding:utf-8_*_# Author:Jaye Heimport reimport os def sql_parse(sql, key_lis): ''' 解析sql命令字元串,按照key_lis列表裡的元素分割sql得到字典形式的命令sql_dic :param sql: :param key_lis: :return: ''' sql_list = [] sql_dic = {} for i in key_lis: b = [j.strip() for j in sql.split(i)] if len(b) > 1: if len(sql.split('limit')) > 1: sql_dic['limit'] = sql.split('limit')[-1] if i == 'where' or i == 'values': sql_dic[i] = b[-1] if sql_list: sql_dic[sql_list[-1]] = b[0] sql_list.append(i) sql = b[-1] else: sql = b[0] if sql_dic.get('select'): if not sql_dic.get('from') and not sql_dic.get('where'): sql_dic['from'] = b[-1] if sql_dic.get('select'): sql_dic['select'] = sql_dic.get('select').split(',') if sql_dic.get('where'): sql_dic['where'] = where_parse(sql_dic.get('where')) return sql_dic def where_parse(where): ''' 格式化where字元串為列表where_list,用'and', 'or', 'not'分割字元串 :param where: :return: ''' casual_l = [where] logic_key = ['and', 'or', 'not'] for j in logic_key: for i in casual_l: if i not in logic_key: if len(i.split(j)) > 1: ele = i.split(j) index = casual_l.index(i) casual_l.pop(index) casual_l.insert(index, ele[0]) casual_l.insert(index+1, j) casual_l.insert(index+2, ele[1]) casual_l = [k for k in casual_l if k] where_list = three_parse(casual_l, logic_key) return where_list def three_parse(casual_l, logic_key): ''' 處理臨時列表casual_l中具體的條件,'staff_id>5'-->['staff_id','>','5'] :param casual_l: :param logic_key: :return: ''' where_list = [] for i in casual_l: if i not in logic_key: b = i.split('like') if len(b) > 1: b.insert(1, 'like') where_list.append(b) else: key = ['<', '=', '>'] new_lis = [] opt = '' lis = [j for j in re.split('([=<>])', i) if j] for k in lis: if k in key: opt += k else: new_lis.append(k) new_lis.insert(1, opt) where_list.append(new_lis) else: where_list.append(i) return where_list def sql_action(sql_dic, title): ''' 把解析好的sql_dic分發給相應函數執行處理 :param sql_dic: :param title: :return: ''' key = {'select': select, 'insert': insert, 'delete': delete, 'update': update} res = [] for i in sql_dic: if i in key: res = key[i](sql_dic, title) return res def select(sql_dic, title): ''' 處理select語句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as fh: filter_res = where_action(fh, sql_dic.get('where'), title) limit_res = limit_action(filter_res, sql_dic.get('limit')) search_res = search_action(limit_res, sql_dic.get('select'), title) return search_res def insert(sql_dic, title): ''' 處理insert語句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r+', encoding='utf-8') as f: data = f.readlines() phone_list = [i.strip().split(',')[4] for i in data] ins_count = 0 if not data: new_id = 1 else: last = data[-1] last_id = int(last.split(',')[0]) new_id = last_id+1 record = sql_dic.get('values').split('/') for i in record: if i.split(',')[3] in phone_list: print('\033[1;31m%s 手機號已存在\033[0m' % i) else: new_record = '%s,%s\n' % (str(new_id), i) f.write(new_record) new_id += 1 ins_count += 1 f.flush() return ['insert successful'], [str(ins_count)] def delete(sql_dic, title): ''' 處理delete語句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: del_count = 0 for line in r_file: dic = dict(zip(title.split(','), line.split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if not filter_res: w_file.write(line) else: del_count += 1 w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['delete successful'], [str(del_count)] def update(sql_dic, title): ''' 處理update語句命令 :param sql_dic: :param title: :return: ''' set_l = sql_dic.get('set').strip().split(',') set_list = [i.split('=') for i in set_l] update_count = 0 with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: for line in r_file: dic = dict(zip(title.split(','), line.strip().split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if filter_res: for i in set_list: k = i[0] v = i[-1] dic[k] = v line = [dic[i] for i in title.split(',')] update_count += 1 line = ','.join(line)+'\n' w_file.write(line) w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['update successful'], [str(update_count)] def where_action(fh, where_list, title): ''' 具體處理where_list里的所有條件 :param fh: :param where_list: :param title: :return: ''' res = [] if len(where_list) != 0: for line in fh: dic = dict(zip(title.split(','), line.strip().split(','))) if dic['name'] != 'name': logic_res = logic_action(dic, where_list) if logic_res: res.append(line.strip().split(',')) else: res = [i.split(',') for i in fh.readlines()] return res pass def logic_action(dic, where_list): ''' 判斷數據文件中每一條是否符合where_list條件 :param dic: :param where_list: :return: ''' logic = [] for exp in where_list: if type(exp) is list: exp_k, opt, exp_v = exp if exp[1] == '=': opt = '==' logical_char = "'%s'%s'%s'" % (dic[exp_k], opt, exp_v) if opt != 'like': exp = str(eval(logical_char)) else: if exp_v in dic[exp_k]: exp = 'True' else: exp = 'False' logic.append(exp) res = eval(' '.join(logic)) return res def limit_action(filter_res, limit_l): ''' 用列表切分處理顯示符合條件的數量 :param filter_res: :param limit_l: :return: ''' if limit_l: index = int(limit_l[0]) res = filter_res[:index] else: res = filter_res return res def search_action(limit_res, select_list, title): ''' 處理需要查詢並顯示的title和相應數據 :param limit_res: :param select_list: :param title: :return: ''' res = [] fields_list = title.split(',') if select_list[0] == '*': res = limit_res else: fields_list = select_list for data in limit_res: dic = dict(zip(title.split(','), data)) r_l = [] for i in fields_list: r_l.append((dic[i].strip())) res.append(r_l) return fields_list, res if __name__ == '__main__': with open('staff_data', 'r', encoding='utf-8') as f: title = f.readline().strip() key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit'] while True: sql = input('請輸入sql命令,退出請輸入exit:').strip() sql = re.sub(' ', '', sql) if len(sql) == 0:continue if sql == 'exit':break sql_dict = sql_parse(sql, key_lis) fields_list, fields_data = sql_action(sql_dict, title) print('\033[1;33m結果如下:\033[0m') print('-'.join(fields_list)) for data in fields_data: print('-'.join(data))

總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。

③ Python中sql 的 select語句中的雙引號」「怎麼處理

python 字元串可以用單引號'',雙引號「」,或者三個單引號''''''表示。如果雙引號表示的字元串裡面有雙引號那就用轉義字元\就可以了

④ python中的sql參數化,想向sql中傳入一組參數,要怎麼寫

一般的處理思路是將資料庫操作的方法放在一個模塊中,比如connectsql.py:
import mysqldb
def execnonquery(sql):
conn = mysqldb.connect(host='xxxx',user='xxxx',passwd='xxxx',db='xxxx')
cur = conn.cursor()
cur.execute(sql)
conn.commit()
conn.close()那麼你的a.py代碼為:
from connectsql import *
def mysql_insert(i,data):
try:
execnonquery('insert into mytest values(%s,%s)' % (i,data))
except:
return 0你的b.py代碼不變。

⑤ python 怎樣使外部.sql腳本執行

直接調命令行工具導入啊。 否則就麻煩 了。需要解析主要的語句,比如create table和insert。最怕碰到復雜的引號,嵌套之類的。經常搞不定啊。

代碼挺復雜的。

⑥ Python/PHP MySQL語句解析器解決業務分表

自己曾經做過一個網盤項目。剛開始由於需要快速地從0到1建設上線,所以沒有對核心文檔表進行分表。當然我的架構理念也是「按需架構設計」。產品需求在沒有明確的長遠計劃的情況下以「小步快跑,趕超競品」為主。後期由於產品功能觸達目標用戶群需求點、產品用戶體驗不斷提升、產品多方位導流、加強產品推廣文檔表每天有百萬數據增長量。不得不對文檔表進行按用戶id分表。當時產品功能已全覆蓋文檔的生命周期。產品功能已豐富多彩。修改所有關聯文檔表的業務代碼為按用戶id分表開發測試成本非常高。上線後線上問題不可控。經過考慮在業務代碼最底層DB層進行SQL語句解析來進行用戶id分表處理。這樣的話開發測試成本都非常低。上線後有問題方便回滾和追查原因。

今天為大家介紹Python/PHP兩種MySQL語句解析器。當時網盤項目用的是PHP編程語言開發。

Python的SQL語句解析器 。個人推薦使用moz_sql_parser庫。經調研官方的sqlparse庫解析出來的語句段無法滿足需求也很難理解。

1、Python moz_sql_parser庫安裝

2、Python moz_sql_parser SQL語句解析

3、Python moz_sql_parser總結

PHP的SQL語句解析器。 個人推薦使用PhpMyAdmin的sql-parser組件。PhpMyAdmin是經過 歷史 檢驗可信賴的。

1、PHP PhpMyAdmin/sql-parser安裝

2、PHP PhpMyAdmin/sql-parser SQL語句解析

3、PHP PhpMyAdmin/sql-parser總結

大家有什麼問題可以發評論溝通。