提醒:本文最后更新于2023-04-30 20:11,文中所关联的信息可能已发生改变,请知悉!
姊妹篇:
https://sharpgan.com/wordpress-to-django/
感兴趣的可以前往看一下这个姊妹篇,我前后左右反复横跳,哈哈~
废话少说,
首先展示一下Django的表结构:
class Category(models.Model):
category_name = models.CharField(max_length=200)
def __str__(self):
if self.category_name:
return self.category_name
class Meta:
verbose_name = "分类"
verbose_name_plural = verbose_name
class Tag(models.Model):
tag_name = models.TextField(max_length=100)
def __str__(self):
if self.tag_name:
return self.tag_name
class Meta:
verbose_name = "标签"
verbose_name_plural = verbose_name
class PostManager(models.Manager):
def distinct_date(self): # 该管理器定义了一个distinct_date方法,目的是找出所有的不同日期
distinct_date_list = [] # 建立一个列表用来存放不同的日期 年-月
date_list = self.values('post_date') # 根据文章字段date_publish找出所有文章的发布时间
for date in date_list: # 对所有日期进行遍历,当然这里会有许多日期是重复的,目的就是找出多少种日期
date = date['post_date'].strftime('%Y年%m月') # 取出一个日期改格式为 ‘xxx年/xxx月 存档’
if date not in distinct_date_list:
distinct_date_list.append(date)
return distinct_date_list
class Post(models.Model):
title = models.CharField(max_length=200)
slug = models.SlugField(max_length=255,unique=True)
author = models.ForeignKey(User, on_delete=models.CASCADE)
post_date = models.DateTimeField(auto_now_add=True)
comment_status = models.CharField(choices=COMMENT_STATUS_CHOICES,
default=('opened', 'opened'), max_length=20)
content = RichTextUploadingField(config_name='default')
tag = models.ManyToManyField('Tag', blank=True)
category = models.ForeignKey(Category, related_name='category', null=True, on_delete=models.CASCADE)
objects = PostManager()
is_wordpress = models.CharField(choices=IS_WORDPRESS, default=('yes', 'yes'), max_length=10)
price = models.FloatField(blank=True, null=True)
class Meta:
verbose_name = "文章"
verbose_name_plural = '\u200B' + verbose_name
ordering = ["-post_date"]
def __str__(self):
if self.title:
return self.title
def get_absolute_url(self):
return reverse('article_detail', args=[str(self.slug)])
class Comments(MPTTModel):
post = models.ForeignKey(Post, on_delete=models.CASCADE)
user_name = models.CharField(max_length=20)
email = models.CharField(max_length=50)
comment_date = models.DateTimeField(auto_now=True)
content = models.TextField(max_length=1000)
parent = TreeForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')
approved = models.CharField(choices=COMMENT_APPROVED_CHOICES,
default=('refused', 'refused'), max_length=20)
def __str__(self):
if self.content:
return self.content
class Meta:
verbose_name = "评论"
verbose_name_plural = verbose_name
ordering = ["-comment_date"]
class Order(models.Model):
order_id = models.CharField(max_length=25)
order_done_datetime = models.DateTimeField(auto_now_add=True)
slug = models.CharField(max_length=200)
def __str__(self):
if self.order_id:
return self.order_id
class Meta:
verbose_name = "订单"
verbose_name_plural = verbose_name
ordering = ["-order_done_datetime"]
然后是Django迁移到WordPress的实战代码:
#!/usr/bin/env python3
# coding=utf-8
import pymysql
import sqlite3
from uuslug import slugify
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "DjangoBlog4Life.settings_dev")
django.setup()
from blog.models import Post, Tag, Comments
class MysqlConn():
def db_connector(self):
"""
功能:用于数据库初始化连接,获取db实例对象
:return: db连接对象
"""
common_db_config = {'host': '192.168.0.1',
'user': 'wp',
'port': 3306,
'passwd': 'xxxxx',
'db': 'wp_db',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor,
'autocommit': False}
try:
conn = pymysql.connect(**common_db_config)
return conn
# 防止mysql连接失败时一顿虾条,多写点提示,考虑到以后可能要在服务器命令行环境下运行会有乱码先酱紫~
except pymysql.MySQLError as e:
print(str(e))
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
sqlite3_conn=sqlite3.connect('db.sqlite3')
sqlite3_conn.row_factory = dict_factory
sqlite3_cur= sqlite3_conn.cursor()
mysql_conn = MysqlConn().db_connector()
mysql_cur = mysql_conn.cursor()
# query django's category and insert into wordpress's table of wp_terms
sql1= "SELECT id, category_name FROM blog_category"
sqlite3_cur.execute(sql1)
data1 = sqlite3_cur.fetchall()
for d in data1:
d_id = d['id']
d_name = d['category_name']
d_slug = slugify(d_name)
sql_inner = "INSERT INTO wp_terms (term_id,name,slug) VALUES ('{id}','{name}','{slug}')".format(id=d_id, name = d_name, slug=d_slug)
mysql_cur.execute(sql_inner)
mysql_conn.commit()
# query django's tag and insert into wordpress's table of wp_terms
sql2= "SELECT id, tag_name FROM blog_tag"
sqlite3_cur.execute(sql2)
data2 = sqlite3_cur.fetchall()
for d in data2:
d_id = d['id']
d_name = d['tag_name']
d_slug = slugify(d_name)
sql_inner = "INSERT INTO wp_terms (term_id,name,slug) VALUES ('{id}','{name}','{slug}')".format(id=d_id, name = d_name, slug=d_slug)
if str(d_id) == '11':
sql_inner = "INSERT INTO wp_terms (term_id,name,slug) VALUES ('{id}','{name}','{slug}')".format(id=230, name = d_name, slug=d_slug)
if str(d_id) == '12':
sql_inner = "INSERT INTO wp_terms (term_id,name,slug) VALUES ('{id}','{name}','{slug}')".format(id=231, name = d_name, slug=d_slug)
print(d_name)
mysql_cur.execute(sql_inner)
mysql_conn.commit()
# query django's category and insert into wordpress's table of wp_term_taxonomy
sql3= "SELECT id, category_name FROM blog_category"
sqlite3_cur.execute(sql3)
data3 = sqlite3_cur.fetchall()
term_taxonomy_id = 3
for d in data3:
d_id = d['id']
count = Post.objects.filter(category_id=d_id).count()
sql_inner = "INSERT INTO wp_term_taxonomy (term_taxonomy_id,term_id,description,taxonomy,count) VALUES ('{term_taxonomy_id}','{term_id}','{description}','{taxonomy}','{count}')".format(term_taxonomy_id = term_taxonomy_id, term_id=d_id,taxonomy ='category', count=count,description='')
mysql_cur.execute(sql_inner)
term_taxonomy_id = term_taxonomy_id + 1
print(d_id)
# mysql_conn.commit()
# query django's tag and insert into wordpress's table of wp_term_taxonomy
sql4= "SELECT id, tag_name FROM blog_tag"
sqlite3_cur.execute(sql4)
data4 = sqlite3_cur.fetchall()
term_taxonomy_id = 12
for d in data4:
d_id = d['id']
d_name = d['tag_name']
count = Tag.objects.get(tag_name=d_name).post_set.count()
sql_inner = "INSERT INTO wp_term_taxonomy (term_taxonomy_id,term_id,description,taxonomy,count) VALUES ('{term_taxonomy_id}','{term_id}','{description}','{taxonomy}','{count}')".format(term_taxonomy_id = term_taxonomy_id, term_id=d_id,taxonomy ='post_tag', count=count,description='')
if str(d_id) == '11':
sql_inner = "INSERT INTO wp_term_taxonomy (term_taxonomy_id,term_id,description,taxonomy,count) VALUES ('{term_taxonomy_id}','{term_id}','{description}','{taxonomy}','{count}')".format(term_taxonomy_id = term_taxonomy_id, term_id=230,taxonomy ='post_tag', count=count,description='')
if str(d_id) == '12':
sql_inner = "INSERT INTO wp_term_taxonomy (term_taxonomy_id,term_id,description,taxonomy,count) VALUES ('{term_taxonomy_id}','{term_id}','{description}','{taxonomy}','{count}')".format(term_taxonomy_id = term_taxonomy_id, term_id=231,taxonomy ='post_tag', count=count,description='')
mysql_cur.execute(sql_inner)
term_taxonomy_id = term_taxonomy_id + 1
print(str(d_id) + ': ' + d_name)
# query django's blog_post and insert into wordpress's wp_posts
sql5= "SELECT * FROM blog_post;"
sqlite3_cur.execute(sql5)
data5 = sqlite3_cur.fetchall()
# print(data)
for d in data5:
d_id = d['id']
d_title = d['title']
d_slug = d['slug']
d_post_date = d['post_date']
d_content = d['content']
post_content = d_content.replace("/media/uploads/","/uploads/").replace('<pre>',"<pre class='corepress-code-pre'>")
guid = "https://sharpgan.com/?p=" + str(d_id)
comment_count = Comments.objects.filter(post_id=d_id).count()
print(comment_count)
sql_inner = """INSERT INTO wp_posts (ID,post_author,post_date,post_date_gmt,post_content,post_title,post_excerpt,post_status,comment_status,ping_status,post_password,post_name,to_ping,pinged,post_modified,post_modified_gmt,post_content_filtered,post_parent,guid,menu_order,post_type,post_mime_type,comment_count) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
mysql_cur.execute(sql_inner, (d_id, 1,d_post_date,d_post_date,post_content,d_title,'','publish','open','open','',d_slug,'','',d_post_date,d_post_date,'',0,guid,0,'post','',comment_count))
print(d_title)
# insert the category's relationship into the wp's table of wp_term_relationships
sql6= "SELECT id,title,category_id FROM blog_post;"
sqlite3_cur.execute(sql6)
data6 = sqlite3_cur.fetchall()
# print(data)
for d in data6:
d_post_id = d['id']
d_title = d['title']
d_category_id = d['category_id']
query_term_taxonomy_id_sql = "select term_taxonomy_id from wp_term_taxonomy where term_id={}".format(d_category_id)
mysql_cur.execute(query_term_taxonomy_id_sql)
data_term_taxonomy_id = mysql_cur.fetchall()[0]['term_taxonomy_id']
insert_wp_term_relationships_sql = "insert into wp_term_relationships (object_id,term_taxonomy_id,term_order) VALUES ('{object_id}','{term_taxonomy_id}','{term_order}')".format(object_id=d_post_id, term_taxonomy_id=data_term_taxonomy_id,term_order=0)
mysql_cur.execute(insert_wp_term_relationships_sql)
print(d_title)
# insert the tag's relationship into the wp's table of wp_term_relationships
sql7= "SELECT post_id,tag_id FROM blog_post_tag;"
sqlite3_cur.execute(sql7)
data7 = sqlite3_cur.fetchall()
# print(data)
for d in data7:
d_post_id = d['post_id']
d_tag_id = d['tag_id']
query_term_taxonomy_id_sql = "select term_taxonomy_id from wp_term_taxonomy where term_id={}".format(d_tag_id)
if str(d_tag_id) == '11':
query_term_taxonomy_id_sql = "select term_taxonomy_id from wp_term_taxonomy where term_id={}".format(230)
if str(d_tag_id) == '12':
query_term_taxonomy_id_sql = "select term_taxonomy_id from wp_term_taxonomy where term_id={}".format(231)
mysql_cur.execute(query_term_taxonomy_id_sql)
data_term_taxonomy_id = mysql_cur.fetchall()[0]['term_taxonomy_id']
sql_innner = "insert into wp_term_relationships (object_id,term_taxonomy_id,term_order) VALUES ('{object_id}','{term_taxonomy_id}','{term_order}')".format(object_id=d_post_id, term_taxonomy_id=data_term_taxonomy_id,term_order=0)
mysql_cur.execute(sql_innner)
print(d_post_id)
# insert the django's comments into wp's table of wp_comments
sql8= "SELECT * FROM blog_comments"
sqlite3_cur.execute(sql8)
data8 = sqlite3_cur.fetchall()
for d in data8:
d_id = d['id']
d_name = d['user_name']
d_email = d['email']
d_comment_date = d['comment_date']
d_content = d['content']
d_post_id = d['post_id']
d_parent_id = d['parent_id']
comment_author = None
comment_author_url = None
user_id = None
comment_parent = None
if d_parent_id is None:
comment_parent = 0
else:
comment_parent = d_parent_id
if d_name == "sharp":
comment_author = 'sharp097'
user_id = 1
comment_author_url = "https://sharpgan.com/"
else:
comment_author = d_name
user_id = 0
comment_author_url = ""
sql_inner = """INSERT into wp_comments (comment_ID,comment_post_ID,comment_author,comment_author_email,comment_author_url,comment_author_IP,comment_date,comment_date_gmt,comment_content,comment_karma,comment_approved,comment_agent,comment_type,comment_parent,user_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
mysql_cur.execute(sql_inner,(d_id,d_post_id,comment_author,d_email,comment_author_url,"",d_comment_date,d_comment_date,d_content,0,1,"","comment",comment_parent,user_id))
print(d_name)
mysql_conn.commit()
mysql_conn.close()
sqlite3_conn.close()
上述代码自己替换一下mysql的ip账号密码之类的,然后执行的时候一段一段的执行,把别的注释掉。
说明:
如果你对下面的判断代码感到好奇我来解释一下:
if str(d_id) == '11':
sql_inner = "INSERT INTO wp_term_taxonomy (term_taxonomy_id,term_id,description,taxonomy,count) VALUES ('{term_taxonomy_id}','{term_id}','{description}','{taxonomy}','{count}')".format(term_taxonomy_id = term_taxonomy_id, term_id=230,taxonomy ='post_tag', count=count,description='')
if str(d_id) == '12':
sql_inner = "INSERT INTO wp_term_taxonomy (term_taxonomy_id,term_id,description,taxonomy,count) VALUES ('{term_taxonomy_id}','{term_id}','{description}','{taxonomy}','{count}')".format(term_taxonomy_id = term_taxonomy_id, term_id=231,taxonomy ='post_tag', count=count,description='')
因为我的django博客的tag表和category表是独立的两个表,而WordPress比较特殊全部放在了一个表中,上述id中的11和12就是我原先的django的独立的两个表中重复两个id,要特殊处理一下,不然会有问题。
最后展示一下迁移完的效果:
