checkpoint, adding done channels for threads

This commit is contained in:
Ross Trottier 2024-06-04 09:17:31 -06:00
parent e02a35069a
commit 7f73716b9a
2 changed files with 48 additions and 252 deletions

View File

@ -7,15 +7,15 @@ import (
) )
// WP Config // WP Config
const baseUrl = "https://slowtwitch.cloud/" const baseUrl = "http://go-api-playground.local/"
const wordpressKey = "admin@slowtwitch.cloud" const wordpressKey = "admin"
const wordpressSecret = "3SrI MysB NREZ 40OD PLAP uk1W" const wordpressSecret = "JXDP 3EgE h1RF jhln b8F5 A1Xk"
// DB Config // DB Config
const slowtwitchAdminUser = "admin" const slowtwitchAdminUser = "admin"
const slowtwitchAdminPass = "yxnh93Ybbz2Nm8#mp28zCVv" const slowtwitchAdminPass = "yxnh93Ybbz2Nm8#mp28zCVv"
const slowtwitchDbName = "slowtwitch" const slowtwitchDbName = "slowtwitch"
const migrationDbName = "slowtwitch_transfer" const migrationDbName = "slowtwitch_transfer_threaded_test"
const federatedDbUrl = "slowtwitch.northend.network" const federatedDbUrl = "slowtwitch.northend.network"
const federatedDbPort = "3306" const federatedDbPort = "3306"

View File

@ -37,256 +37,50 @@ func (migration MigratePosts) Execute() []PostResult {
panic("Could not migrate posts due to tags not found:" + wpTagErr.Error()) panic("Could not migrate posts due to tags not found:" + wpTagErr.Error())
} }
//TODO Re-thread using a work input channel, work output channel, and waitgroup
var postResults []PostResult var postResults []PostResult
batchSize := 5 batchSize := 5
var postBatch []int workInputChannel := make(chan int)
workOutputChannel := make(chan PostResult)
var wg sync.WaitGroup
wg.Add(1)
wg.Add(len(slowtwitchPostIdsForMigration))
for i, postId := range slowtwitchPostIdsForMigration { for i := 0; i < batchSize; i++ {
//anonymous func will take in post IDs: the rest it can access from scope go func() {
postBatch = append(postBatch, postId) for postId := range workInputChannel {
result := createPost(postId, migration, wpTagData)
if len(postBatch) == batchSize || i == len(slowtwitchPostIdsForMigration)-1 { workOutputChannel <- result
postResultsChannel := make(chan PostResult) wg.Done()
for _, batchId := range postBatch {
go func(id int) {
errorMessage := ""
//migrate
postBase, postBaseErr := slowtwitch.GetPostBase(batchId, migration.SlowtwitchDatabase)
if postBaseErr != nil {
errorMessage = errorMessage + postBaseErr.Error()
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
createWordpressPost := wordpress.CreatePost{
Title: postBase.Title,
Excerpt: postBase.Description,
Status: "publish",
}
if postBase.DatePublished.Valid {
t := postBase.DatePublished.Time
timeString := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d",
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute(), t.Second())
createWordpressPost.Date = timeString
} else {
errorMessage = errorMessage + "Invalid Date Published"
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
for _, tag := range wpTagData {
if postBase.Bike == true && tag.Name == "bike" {
createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id)
}
if postBase.Swim == true && tag.Name == "swim" {
createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id)
}
if postBase.Run == true && tag.Name == "run" {
createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id)
}
}
var wordPressCategoryIds []int
var firstCategoryResult CategoryResult
for _, slowtwitchCategoryId := range postBase.CategoryIds {
categoryResult, err := GetSlowtwitchCategoryResult(slowtwitchCategoryId, migration.ResultsDatabase)
if err != nil {
errorMessage = errorMessage + err.Error()
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
wordPressCategoryIds = append(wordPressCategoryIds, categoryResult.WordpressId)
firstCategoryResult = categoryResult
}
if len(wordPressCategoryIds) == 0 {
errorMessage = "This post has no categories and is broken on the production site."
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
createWordpressPost.Categories = wordPressCategoryIds
//Get Author ID
editor, findEditorErr := GetEditor(postBase.Author, postBase.AuthorEmail, migration.ResultsDatabase)
if findEditorErr != nil {
editor, catchAllEditorErr := GetEditor("admin", "slowman2@slowtwitch.com", migration.ResultsDatabase)
if catchAllEditorErr != nil {
errorMessage = errorMessage + findEditorErr.Error()
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
createWordpressPost.Author = editor.WordpressId
}
createWordpressPost.Author = editor.WordpressId
//Get old link
oldLink := strings.ReplaceAll(firstCategoryResult.OldUrl, "index.html", "") + slowtwitch.ConvertPostTitleToPath(postBase.Title, postBase.Id)
linkStatus := slowtwitch.GetPageStatus(oldLink)
if linkStatus == 404 {
errorMessage = errorMessage + "Page not found on Slowtwitch"
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
//Get page, parse out post data and images
//Upload images to wordpress, swap out with new image urls
//Submit
imagePaths, html, retreiveHtmlErr := slowtwitch.GetImagesAndPostHtml(oldLink)
if retreiveHtmlErr != nil {
errorMessage = errorMessage + retreiveHtmlErr.Error()
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
//Create images from the image paths
var imageResults []ImageResult
for i, imagePath := range imagePaths {
//construct URL
imageUrl := "https://www.slowtwitch.com" + imagePath
createWordpressImage := wordpress.CreateImage{
Url: imageUrl,
}
//submit image
wordpressImage, wordpressImageErr := createWordpressImage.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword)
if wordpressImageErr != nil {
errorMessage = errorMessage + wordpressImageErr.Error()
imageFailureResult := ImageResult{
OldUrl: imageUrl,
NewUrl: "",
WordpressId: 0,
IsSuccess: false,
}
imageResults = append(imageResults, imageFailureResult)
continue
}
//first photo is the featured photo
if i == 0 {
createWordpressPost.FeaturedMedia = wordpressImage.Id
}
//begin process of recording result
imageResult := ImageResult{
OldUrl: imageUrl,
NewUrl: wordpressImage.Link,
WordpressId: wordpressImage.Id,
IsSuccess: true,
}
imageResults = append(imageResults, imageResult)
//replace old links with new in post html
newImagePath := "/wp-content/uploads/" + wordpressImage.MediaDetails.File
html = strings.ReplaceAll(html, imagePath, newImagePath)
//create redirect
imageRedirect := wordpress.CreateRedirect{
Title: postBase.Title + "image-" + string((i + 1)),
Url: imagePath,
MatchType: "page",
ActionType: "url",
ActionCode: 301,
GroupId: 1,
ActionData: wordpress.ActionData{
Url: "/" + wordpressImage.Slug,
},
}
_, imageRedirectErr := imageRedirect.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword)
if imageRedirectErr != nil {
fmt.Println("Failed to create image redirect:", imageUrl, ":", imageRedirectErr.Error())
}
}
createWordpressPost.Content = html
post, createWordpressPostErr := createWordpressPost.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword)
if createWordpressPostErr != nil {
errorMessage = errorMessage + createWordpressPostErr.Error()
postFailureResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- postFailureResult
return
}
//set up post result here to create
//truncate error message for db
if len(errorMessage) > 1450 {
errorMessage = errorMessage[:1450]
}
postResult := PostResult{
SlowtwitchId: batchId,
WordpressId: post.Id,
OldUrl: oldLink,
OldUrlStatus: linkStatus,
NewUrl: post.Link,
IsSuccess: true,
ErrorMessage: errorMessage,
}
postResultId, createPostResultErr := CreatePostResult(postResult, migration.ResultsDatabase)
if createPostResultErr != nil {
fmt.Println("Could not record post result for Slowtwitch post:" + strconv.Itoa(batchId) + createPostResultErr.Error())
}
for _, imageResult := range imageResults {
imageResult.PostId = postResultId
createImageResultErr := CreateImageResult(imageResult, migration.ResultsDatabase)
if createImageResultErr != nil {
fmt.Println("Error recording image result")
}
}
oldPath := strings.ReplaceAll(oldLink, "https://www.slowtwitch.com", "")
postRedirect := wordpress.CreateRedirect{
Title: "Article Redirect" + postBase.Title,
Url: oldPath,
MatchType: "page",
ActionType: "url",
ActionCode: 301,
GroupId: 1,
ActionData: wordpress.ActionData{
Url: "/" + strings.ReplaceAll(postResult.NewUrl, migration.WordpressBaseUrl, ""),
},
}
_, postRedirectErr := postRedirect.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword)
if postRedirectErr != nil {
fmt.Println("Error creating redirect for", batchId, ":"+postRedirectErr.Error())
}
fmt.Println("Successfully created post and result for", batchId)
updateAcfImages(imageResults, post.Id, migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword)
postResults = append(postResults, postResult)
postResultsChannel <- postResult
}(batchId)
} }
//Listen to channel and record results }()
var batchResults []PostResult
for result := range postResultsChannel {
batchResults = append(batchResults, result)
if len(batchResults) == batchSize {
close(postResultsChannel)
}
}
postBatch = nil
postResults = append(postResults, batchResults...)
}
} }
// Update related posts once work is done -- Do for all posts?
allPostResults := GetAllPostResults(migration.ResultsDatabase) go func() {
updatePostRelationships(allPostResults, migration) defer close(workInputChannel)
for _, postId := range slowtwitchPostIdsForMigration {
workInputChannel <- postId
}
wg.Done()
}()
go func() {
for postResult := range workOutputChannel {
postResults = append(postResults, postResult)
}
}()
wg.Wait()
close(workOutputChannel)
updatePostRelationships(postResults, migration)
return postResults return postResults
} }
func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagData) PostResult { func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagData) PostResult {
fmt.Println("Creating post " + strconv.Itoa(postId))
errorMessage := "" errorMessage := ""
//migrate //migrate
postBase, postBaseErr := slowtwitch.GetPostBase(postId, migration.SlowtwitchDatabase) postBase, postBaseErr := slowtwitch.GetPostBase(postId, migration.SlowtwitchDatabase)
@ -302,7 +96,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
Excerpt: postBase.Description, Excerpt: postBase.Description,
Status: "publish", Status: "publish",
} }
fmt.Println("Checking date: " + strconv.Itoa(postId))
if postBase.DatePublished.Valid { if postBase.DatePublished.Valid {
t := postBase.DatePublished.Time t := postBase.DatePublished.Time
timeString := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d", timeString := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d",
@ -314,7 +108,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
failedPostResult := createPostFailureResult(postId, errorMessage, migration.ResultsDatabase) failedPostResult := createPostFailureResult(postId, errorMessage, migration.ResultsDatabase)
return failedPostResult return failedPostResult
} }
fmt.Println("Checking tags: " + strconv.Itoa(postId))
for _, tag := range wpTagData { for _, tag := range wpTagData {
if postBase.Bike == true && tag.Name == "bike" { if postBase.Bike == true && tag.Name == "bike" {
createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id) createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id)
@ -326,7 +120,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id) createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id)
} }
} }
fmt.Println("Checking categories: " + strconv.Itoa(postId))
var wordPressCategoryIds []int var wordPressCategoryIds []int
var firstCategoryResult CategoryResult var firstCategoryResult CategoryResult
for _, slowtwitchCategoryId := range postBase.CategoryIds { for _, slowtwitchCategoryId := range postBase.CategoryIds {
@ -349,7 +143,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
createWordpressPost.Categories = wordPressCategoryIds createWordpressPost.Categories = wordPressCategoryIds
//Get Author ID //Get Author ID
editor, findEditorErr := GetEditor(postBase.Author, postBase.AuthorEmail, migration.ResultsDatabase) editor, findEditorErr := GetEditor(postBase.Author, postBase.AuthorEmail, migration.ResultsDatabase)
fmt.Println("Finding editor: " + strconv.Itoa(postId))
if findEditorErr != nil { if findEditorErr != nil {
editor, catchAllEditorErr := GetEditor("admin", "slowman2@slowtwitch.com", migration.ResultsDatabase) editor, catchAllEditorErr := GetEditor("admin", "slowman2@slowtwitch.com", migration.ResultsDatabase)
@ -364,7 +158,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
//Get old link //Get old link
oldLink := strings.ReplaceAll(firstCategoryResult.OldUrl, "index.html", "") + slowtwitch.ConvertPostTitleToPath(postBase.Title, postBase.Id) oldLink := strings.ReplaceAll(firstCategoryResult.OldUrl, "index.html", "") + slowtwitch.ConvertPostTitleToPath(postBase.Title, postBase.Id)
linkStatus := slowtwitch.GetPageStatus(oldLink) linkStatus := slowtwitch.GetPageStatus(oldLink)
fmt.Println("Checking link: " + strconv.Itoa(postId))
if linkStatus == 404 { if linkStatus == 404 {
errorMessage = errorMessage + "Page not found on Slowtwitch" errorMessage = errorMessage + "Page not found on Slowtwitch"
failedPostResult := createPostFailureResult(postId, errorMessage, migration.ResultsDatabase) failedPostResult := createPostFailureResult(postId, errorMessage, migration.ResultsDatabase)
@ -374,6 +168,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
//Get page, parse out post data and images //Get page, parse out post data and images
//Upload images to wordpress, swap out with new image urls //Upload images to wordpress, swap out with new image urls
//Submit //Submit
fmt.Println("Checking images: " + strconv.Itoa(postId))
imagePaths, html, retreiveHtmlErr := slowtwitch.GetImagesAndPostHtml(oldLink) imagePaths, html, retreiveHtmlErr := slowtwitch.GetImagesAndPostHtml(oldLink)
if retreiveHtmlErr != nil { if retreiveHtmlErr != nil {
errorMessage = errorMessage + retreiveHtmlErr.Error() errorMessage = errorMessage + retreiveHtmlErr.Error()
@ -437,7 +232,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
fmt.Println("Failed to create image redirect:", imageUrl, ":", imageRedirectErr.Error()) fmt.Println("Failed to create image redirect:", imageUrl, ":", imageRedirectErr.Error())
} }
} }
fmt.Println("Checking html: " + strconv.Itoa(postId))
createWordpressPost.Content = html createWordpressPost.Content = html
post, createWordpressPostErr := createWordpressPost.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword) post, createWordpressPostErr := createWordpressPost.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword)
if createWordpressPostErr != nil { if createWordpressPostErr != nil {
@ -460,7 +255,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat
IsSuccess: true, IsSuccess: true,
ErrorMessage: errorMessage, ErrorMessage: errorMessage,
} }
fmt.Println("Recording result: " + strconv.Itoa(postId))
postResultId, createPostResultErr := CreatePostResult(postResult, migration.ResultsDatabase) postResultId, createPostResultErr := CreatePostResult(postResult, migration.ResultsDatabase)
if createPostResultErr != nil { if createPostResultErr != nil {
fmt.Println("Could not record post result for Slowtwitch post:" + strconv.Itoa(postId) + createPostResultErr.Error()) fmt.Println("Could not record post result for Slowtwitch post:" + strconv.Itoa(postId) + createPostResultErr.Error())
@ -517,11 +312,12 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) {
//as the workers take work from the channel. This will be blocked while //as the workers take work from the channel. This will be blocked while
//any task is waiting to be picked up from the channel //any task is waiting to be picked up from the channel
go func() { go func() {
defer close(postResultWorkInput)
for _, result := range postResults { for _, result := range postResults {
postResultWorkInput <- result postResultWorkInput <- result
} }
close(postResultWorkInput)
wg.Done() wg.Done()
}() }()