文章目錄
- python常用數據存儲方法
- txt
- json
- csv
- msyql
- 安裝pymysql
- 連接
- 建庫建表
- 插入
- 刪除
- 修改
- 查詢
- mongodb
- 安裝
- 連接
- 建庫建文檔
- 增
- 刪
- 改
- 查
- redis
- 安裝
- 連接
- 操作
- 公用方法
- 字符串
- 列表
- 集合
- 有序集合
- 散列
- 其他
- sqlalchemy
- postgresql
- mysql
- oracle
- Microsoft SQL Server
- sqlite
python常用數據存儲方法
txt
with
open
(
'test.txt'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
f
.
write
(
'sdfasdf'
+
'\n'
)
json
object = json.loads(jsonstr) : json字符串 轉化為 python對象
str = json.dumps(objects,indent=2) :python對象 轉化為 json字符串
注意:輸出中文要設置ensure_ascii為False,還要注意編碼
with
open
(
'test.txt'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
f
.
write
(
json
.
dumps
(
objects
,
indent
=
2
,
ensure_ascii
=
False
)
)
操作文件
import
json
with
open
(
'jiuba.json'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
json
.
dump
(
res
.
json
(
)
,
f
,
indent
=
2
,
ensure_ascii
=
False
)
with
open
(
'jiuba.json'
,
encoding
=
'utf8'
)
as
f
:
jiuba_dict
=
json
.
load
(
f
)
csv
用csv模塊寫,用pandas.read_csv() 讀取很方便
import
csv
from
pandas
import
read_csv
with
open
(
'csv.csv'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
writer
=
csv
.
writer
(
f
,
delimiter
=
','
)
writer
.
writerow
(
[
'id'
,
'name'
,
'age'
]
)
writer
.
writerows
(
(
[
'1'
,
'bob'
,
'12'
]
,
[
'2'
,
'amy'
,
'21'
]
)
)
# 讀寫字典格式
with
open
(
'dict.csv'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
field_headers
=
[
'id'
,
'name'
,
'age'
]
writer
=
csv
.
DictWriter
(
f
,
fieldnames
=
field_headers
)
writer
.
writeheader
(
)
writer
.
writerow
(
{
'id'
:
3
,
'name'
:
'Mike'
,
'age'
:
22
}
)
df
=
read_csv
(
'csv.csv'
)
print
(
df
)
msyql
安裝pymysql
pip install PyMySQL
To use “sha256_password” or “caching_sha2_password” for authenticate,
you need to install additional dependency:
pip install PyMySQL[rsa]
連接
單實例連接
db = pymysql.connect(host='127.0.0.1', port=3306,
user='root', password='huajian')
cursor = db.cursor()
建庫建表
cursor
.
execute
(
f
'create database if not exists {db_name};'
)
cursor
.
execute
(
f
'use {db_name};'
)
cursor
.
execute
(
f
'create table if not exists {table_name}(id varchar(20), sex varchar(30),name varchar(20), age int, primary key (id));'
)
插入
data
=
{
'id'
:
'465674646'
,
'name'
:
'Mike'
,
'age'
:
25
,
'sex'
:
'Male'
}
# ================================================
# insert ignore 使用唯一約束情況下,存在則不插入,保持原來數據
# =================================================
table_name
=
'xxxx'
keys
=
','
.
join
(
data
.
keys
(
)
)
values
=
','
.
join
(
map
(
lambda
x
:
f
'"{x}"'
,
data
.
values
(
)
)
)
sql
=
f
'insert ignore into {table_name}({keys}) values({values});'
try
:
cursor
.
execute
(
sql
)
db
.
commit
(
)
except
:
db
.
rollback
(
)
# =================================================
# insert update 使用唯一約束情況下,存在則替換,更換為新數據
# =================================================
keys
=
','
.
join
(
data
.
keys
(
)
)
values
=
','
.
join
(
map
(
lambda
x
:
f
'"{x}"'
,
data
.
values
(
)
)
)
update_sql
=
','
.
join
(
[
f
'{k} = "{v}"'
for
k
,
v
in
data
.
items
(
)
]
)
sql
=
f
'insert into {table_name}({keys}) values({values}) on duplicate key update {update_sql};'
try
:
cursor
.
execute
(
sql
)
db
.
commit
(
)
except
:
db
.
rollback
(
)
cursor
.
close
(
)
db
.
close
(
)
刪除
修改
查詢
sql
=
'select * from xtable;'
cursor
.
execute
(
sql
)
res
=
cursor
.
fetchall
(
)
# res = cursor.fetchone()
……
# =======================================================
# 下面是逐條獲取
# =======================================================
sql
=
'select * from xtabel;'
try
:
cursor
.
execute
(
sql
)
print
(
'Count:'
,
cursor
.
rowcount
)
row
=
cursor
.
fetchone
(
)
print
(
row
)
while
row
:
row
=
cursor
.
fetchone
(
)
print
(
row
)
except
:
print
(
'error'
)
mongodb
安裝
pip install --upgrade pymongo
連接
client
=
pymongo
.
MongoClient
(
host
=
'localhost'
,
port
=
27017
)
建庫建文檔
db = client.test_db
collection = db.test_collection
增
collection.insert_one({……})
collection.insert_many({……},{……})
刪
collection.remove({……})
改
# 正常情況:查找,然后修改
# 下面這個,去重插入,
collection.update_one({'id': data1['id']}, {'$set': data1}, True)
查
collection.find()
collection.find_one()
collection.find().count()
redis
安裝
pip install redis hiredis
連接
連接池不需要管關閉的事情
import
redis
pool
=
redis
.
ConnectionPool
(
host
=
'localhost'
,
port
=
6379
,
db
=
15
,
decode_responses
=
True
)
r
=
redis
.
StrictRedis
(
connection_pool
=
pool
)
操作
公用方法
方法 | 作用 | 例子 |
---|---|---|
exists() | 判斷是否存在 | |
delete() | 刪除一個鍵 | |
type() | 判斷鍵類型 | |
keys(pattern) | 獲取所有符合規則的鍵 | keys(*) |
randomkey() | 隨機獲取一個鍵 | |
rename(src,dst) | ||
dbsize() | ||
expire(name,time) | ||
ttl() | ||
move(name) | 把鍵移動到其他數據庫 | |
flushdb() | 清空當前數據庫 | |
flushall() | 清空所有數據庫 | |
expire() |
字符串
方法 | 作用 | 例子 |
---|---|---|
set | ||
get | ||
getset() | ||
mget() | ||
mset() | ||
incr(name,amount=1) | ||
decr(name, amount=1) | ||
append(key,value) | ||
列表
方法 | 作用 | 例子 |
---|---|---|
rpush() | 末尾添加一個元素 | |
lpush() | ||
lpop() | ||
rpop() | ||
llen(name) | ||
lrange(name,start,end) | ||
lindex(name,index) | ||
lset(name,index,value) | lset(‘list’,1,5) 索引為1的元素,賦值為5 | |
lrem(name.count,value) | lrem(‘list’,2,3) 刪除兩個3 |
集合
方法 | 作用 | 例子 |
---|---|---|
sadd(setname,*values) | ||
srem(setname,*values) | ||
spop(setname) | ||
smove(src,dst,value) | 把value移動到另外一個集合中去 | |
scard(name) | 獲取元素個數 | |
sismember(setname,value) | 判斷 | |
sinter([setname,setname,……]) | 返回交集 | |
sunion([setname,setname,……]) | 返回并集 | |
sdiff([setname,setname,……]) | 返回差集 | |
smembers(setname) | 查看全部元素 | |
srandmember(setname) | 隨即返回一個元素,不刪除 | |
有序集合
方法 | 作用 | 例子 |
---|---|---|
zadd() | zadd(‘grade’,100,‘bob’,98,‘mike’) | |
zrem() | zrem(‘grade’,‘Mike’) | |
zincrby() | zincrby(‘grade’,‘bob’,-2) | |
zrank(setname,value) | 返回名次,從小到大排序 | zrank(‘grade’,‘amy’) |
zrevrank(setname,value) | 返回名詞,從大到小排序 | |
zrevrange(setname,start,end) | zrevrange(‘grade’,0,3) | |
zrangebyscore(setname,start,end) | zrangebyscore(‘grade’,85,100) | |
zcount(setname,min,max) | min->max之間的個數 | |
zcard(setname) | 返回個數 | |
zremrangebyrank(setname,min,max) | 刪除 | |
zremrangebyscore(setname,min,max) | 刪除元素 | |
散列
方法 | 作用 | 例子 |
---|---|---|
hset() | hset(‘price’,‘cake’,5) | |
hget() | ||
hmset() | ||
hmget() | ||
hincrby() | ||
hexists() | ||
hdel() | ||
hlen() | ||
hkeys() | ||
hvals() | ||
hgetall() |
其他
sqlalchemy
dialect+driver://username:password@host:port/database
例如:
from
sqlalchemy
import
create_engine
engine
=
create_engine
(
"mysql://scott:tiger@hostname/dbname"
,
encoding
=
'latin1'
,
echo
=
True
,
pool_size
=
10
)
例子:
from
sqlalchemy
import
create_engine
DB_NAME
=
'only_test'
TABLE_NAME
=
'students'
DB_URI
=
f
'mysql://root:huajian@localhost:3306/{DB_NAME}'
engine
=
create_engine
(
DB_URI
,
encoding
=
'utf8'
,
echo
=
True
)
conn
=
engine
.
connect
(
)
sql_cre_table
=
f
'''
create table if not exists {TABLE_NAME}(
id int(11) auto_increment,
name varchar(20),
age int,
grade int,
primary key (id)
)
'''
sql_insert_1
=
f
'insert ignore into {TABLE_NAME}(name,age,grade) values ("bob",21,98);'
sql_insert_2
=
f
'insert ignore into {TABLE_NAME}(name,age,grade) values ("amy",21,98);'
# 一次性commit
with
conn
.
begin
(
)
as
trans
:
conn
.
execute
(
sql_cre_table
)
conn
.
execute
(
sql_insert_1
)
conn
.
execute
(
sql_insert_2
)
res
=
conn
.
execute
(
'select count(*) from region_new;'
)
for
i
in
res
:
print
(
i
)
trans
.
commit
(
)
postgresql
# default
engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')
# psycopg2
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')
# pg8000
engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')
mysql
# default
engine = create_engine('mysql://scott:tiger@localhost/foo')
# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')
# PyMySQL
engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo')
oracle
engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname')
Microsoft SQL Server
# pyodbc
engine = create_engine('mssql+pyodbc://scott:tiger@mydsn')
# pymssql
engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname')
sqlite
engine = create_engine('sqlite:///foo.db')
# to use a SQLite :memory: database, specify an empty URL:
engine = create_engine('sqlite://')
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
