首先展示一下Django的表结构:
class Category(models.Model):
category_name = models.CharField(max_length=200)
def __str__(self):
if self.category_name:
return self.category_name
class Meta:
verbose_name = "分类"
verbose_name_plural = verbose_name
class Tag(models.Model):
tag_name = models.TextField(max_length=100)
def __str__(self):
if self.tag_name:
return self.tag_name
class Meta:
verbose_name = "标签"
verbose_name_plural = verbose_name
class PostManager(models.Manager):
def distinct_date(self): # 该管理器定义了一个distinct_date方法,目的是找出所有的不同日期
distinct_date_list = [] # 建立一个列表用来存放不同的日期 年-月
date_list = self.values('post_date') # 根据文章字段date_publish找出所有文章的发布时间
for date in date_list: # 对所有日期进行遍历,当然这里会有许多日期是重复的,目的就是找出多少种日期
date = date['post_date'].strftime('%Y年%m月') # 取出一个日期改格式为 ‘xxx年/xxx月 存档’
if date not in distinct_date_list:
distinct_date_list.append(date)
return distinct_date_list
class Post(models.Model):
title = models.CharField(max_length=200)
slug = models.SlugField(max_length=255,unique=True)
author = models.ForeignKey(User, on_delete=models.CASCADE)
post_date = models.DateTimeField(auto_now_add=True)
comment_status = models.CharField(choices=COMMENT_STATUS_CHOICES,
default=('opened', 'opened'), max_length=20)
content = RichTextUploadingField(config_name='default')
tag = models.ManyToManyField('Tag', blank=True)
category = models.ForeignKey(Category, related_name='category', null=True, on_delete=models.CASCADE)
objects = PostManager()
is_wordpress = models.CharField(choices=IS_WORDPRESS, default=('yes', 'yes'), max_length=10)
price = models.FloatField(blank=True, null=True)
class Meta:
verbose_name = "文章"
verbose_name_plural = '\u200B' + verbose_name
ordering = ["-post_date"]
def __str__(self):
if self.title:
return self.title
def get_absolute_url(self):
return reverse('article_detail', args=[str(self.slug)])
class Comments(MPTTModel):
post = models.ForeignKey(Post, on_delete=models.CASCADE)
user_name = models.CharField(max_length=20)
email = models.CharField(max_length=50)
comment_date = models.DateTimeField(auto_now=True)
content = models.TextField(max_length=1000)
parent = TreeForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')
approved = models.CharField(choices=COMMENT_APPROVED_CHOICES,
default=('refused', 'refused'), max_length=20)
def __str__(self):
if self.content:
return self.content
class Meta:
verbose_name = "评论"
verbose_name_plural = verbose_name
ordering = ["-comment_date"]
class Order(models.Model):
order_id = models.CharField(max_length=25)
order_done_datetime = models.DateTimeField(auto_now_add=True)
slug = models.CharField(max_length=200)
def __str__(self):
if self.order_id:
return self.order_id
class Meta:
verbose_name = "订单"
verbose_name_plural = verbose_name
ordering = ["-order_done_datetime"]
然后是WordPress迁移到Django的实战代码:
#!/usr/bin/env python3
# coding=utf-8
import pymysql
import sqlite3
from pymysql.connections import Connection
from pymysql.cursors import Cursor
from pymysql.connections import Connection as MyConnection
class MysqlConn():
def db_connector(self) -> Connection:
"""
功能:用于数据库初始化连接,获取db实例对象
:return: db连接对象
"""
common_db_config = {'host': '192.168.0.1',
'user': 'root',
'port': 3306,
'passwd': '123456root',
'db': 'wordpress',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor,
'autocommit': False}
try:
conn: Connection = pymysql.connect(**common_db_config)
return conn
# 防止mysql连接失败时一顿虾条,多写点提示,考虑到以后可能要在服务器命令行环境下运行会有乱码先酱紫~
except pymysql.MySQLError as e:
print(str(e))
class DBConnector(object):
def __init__(self):
self.db: MyConnection = MysqlConn().db_connector()
self.cur: Cursor = self.db.cursor()
db=DBConnector()
sqlite3_conn=sqlite3.connect('db.sqlite3')
sqlite3_cur= sqlite3_conn.cursor()
# find duplicated post
"""
data_list = [d['object_id'] for d in data]
import collections
print([item for item, count in collections.Counter(data_list).items() if count > 1])]
"""
# query category
sql1= "select term_id,name from wp_terms where term_id>=4 and term_id<=10"
# insert category
db.cur.execute(sql1)
data1 = db.cur.fetchall()
for d in data1:
sqlite3_sql = "INSERT INTO BLOG_CATEGORY (ID,CATEGORY_NAME) VALUES ({id},'{name}')".format(id=d['term_id'],name=d['name'])
sqlite3_cur.execute(sqlite3_sql)
# query tag
sql2 = "select wp_term_taxonomy.term_id,wp_terms.name from wp_term_taxonomy,wp_terms where wp_term_taxonomy.taxonomy = 'post_tag' and wp_term_taxonomy.term_id=wp_terms.term_id"
# insert tag
db.cur.execute(sql2)
data2 = db.cur.fetchall()
for d in data2:
sqlite3_sql = "INSERT INTO BLOG_TAG (ID,TAG_NAME) VALUES ({id},'{name}')".format(id=d['term_id'],name=d['name'])
sqlite3_cur.execute(sqlite3_sql)
# query post
sql3="select ID,post_date,post_content,post_title,post_name,comment_status from wp_posts where post_status='publish' and post_type='post';"
# insert post
db.cur.execute(sql3)
data3 = db.cur.fetchall()
for d in data3:
sqlite3_sql = "INSERT INTO BLOG_POST (ID,TITLE,SLUG,POST_DATE,CONTENT,AUTHOR_ID,COMMENT_STATUS) VALUES (?,?,?,?,?,?,?)"
sqlite3_cur.execute(sqlite3_sql,(d['ID'],d['post_title'],d['post_name'],d['post_date'],d['post_content'],1,'open'))
# query category relation
sql4= "SELECT t.term_id, tr.object_id FROM wp_terms AS t INNER JOIN wp_term_taxonomy AS tt ON t.term_id = tt.term_id INNER JOIN wp_term_relationships AS tr ON tr.term_taxonomy_id = tt.term_taxonomy_id INNER JOIN wp_posts ON tr.object_id=wp_posts.ID WHERE tt.taxonomy='category'"
# insert category relation
db.cur.execute(sql4)
data4 = db.cur.fetchall()
sqlite3_query_posts_sql = "select id from blog_post"
sqlite3_cur.execute(sqlite3_query_posts_sql)
sqlite3_query_posts_results =[data[0] for data in sqlite3_cur.fetchall()]
for q_p_result in sqlite3_query_posts_results:
for cate_relation in data4:
if q_p_result == cate_relation['object_id']:
sql = "update blog_post set category_id={cate_id} where id={post_id}".format(cate_id=cate_relation['term_id'],
post_id=q_p_result)
print(sql)
sqlite3_cur.execute(sql)
# query tag relation
sql5= "SELECT t.term_id, tr.object_id FROM wp_terms AS t INNER JOIN wp_term_taxonomy AS tt ON t.term_id = tt.term_id INNER JOIN wp_term_relationships AS tr ON tr.term_taxonomy_id = tt.term_taxonomy_id INNER JOIN wp_posts ON tr.object_id=wp_posts.ID WHERE tt.taxonomy='post_tag'"
# insert tag relation
db.cur.execute(sql5)
data5 = db.cur.fetchall()
for d in data5:
sql = "insert into blog_post_tag (post_id, tag_id) values ({post_id},{tag_id})".format(post_id=d['object_id'],
tag_id=d['term_id'])
print(sql)
sqlite3_cur.execute(sql)
# query comments
sql6="select comment_ID,comment_post_ID,comment_author,comment_author_email,comment_date,comment_content,comment_parent from wp_comments;"
# insert comment
db.cur.execute(sql6)
data6 = db.cur.fetchall()
for d in data6:
sql6 = "insert into blog_comments (id,post_id,user_name,email,comment_date,content,comment_parent,approved) values ('{id}','{post_id}','{user_name}','{email}','{date}','{content}','{parent}','{approved}')".format(
id=d['comment_ID'],post_id=d['comment_post_ID'],user_name=d['comment_author'],email=d['comment_author_email'],
date=d['comment_date'],content=d['comment_content'],parent=d['comment_parent'],approved='approved'
)
print(sql)
sqlite3_cur.execute(sql)
sqlite3_conn.commit()
print("Records created successfully")
sqlite3_conn.close()
上述代码自己替换一下mysql的ip账号密码之类的,然后执行的时候一段一段的执行,把别的注释掉。
最后展示一下迁移完的效果:
