diff --git a/main.go b/main.go index 9a5271d..2012da3 100644 --- a/main.go +++ b/main.go @@ -7,15 +7,15 @@ import ( ) // WP Config -const baseUrl = "https://slowtwitch.cloud/" -const wordpressKey = "admin@slowtwitch.cloud" -const wordpressSecret = "3SrI MysB NREZ 40OD PLAP uk1W" +const baseUrl = "http://go-api-playground.local/" +const wordpressKey = "admin" +const wordpressSecret = "JXDP 3EgE h1RF jhln b8F5 A1Xk" // DB Config const slowtwitchAdminUser = "admin" const slowtwitchAdminPass = "yxnh93Ybbz2Nm8#mp28zCVv" const slowtwitchDbName = "slowtwitch" -const migrationDbName = "slowtwitch_transfer" +const migrationDbName = "slowtwitch_transfer_threaded_test" const federatedDbUrl = "slowtwitch.northend.network" const federatedDbPort = "3306" diff --git a/services/migration/migrate-posts.go b/services/migration/migrate-posts.go index cb49976..f05eb52 100644 --- a/services/migration/migrate-posts.go +++ b/services/migration/migrate-posts.go @@ -37,256 +37,50 @@ func (migration MigratePosts) Execute() []PostResult { panic("Could not migrate posts due to tags not found:" + wpTagErr.Error()) } - //TODO Re-thread using a work input channel, work output channel, and waitgroup var postResults []PostResult batchSize := 5 - var postBatch []int + workInputChannel := make(chan int) + workOutputChannel := make(chan PostResult) + var wg sync.WaitGroup + wg.Add(1) + wg.Add(len(slowtwitchPostIdsForMigration)) - for i, postId := range slowtwitchPostIdsForMigration { - //anonymous func will take in post IDs: the rest it can access from scope - postBatch = append(postBatch, postId) - - if len(postBatch) == batchSize || i == len(slowtwitchPostIdsForMigration)-1 { - postResultsChannel := make(chan PostResult) - - for _, batchId := range postBatch { - go func(id int) { - errorMessage := "" - //migrate - postBase, postBaseErr := slowtwitch.GetPostBase(batchId, migration.SlowtwitchDatabase) - - if postBaseErr != nil { - errorMessage = errorMessage + postBaseErr.Error() - failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- failedPostResult - return - } - - createWordpressPost := wordpress.CreatePost{ - Title: postBase.Title, - Excerpt: postBase.Description, - Status: "publish", - } - - if postBase.DatePublished.Valid { - t := postBase.DatePublished.Time - timeString := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d", - t.Year(), t.Month(), t.Day(), - t.Hour(), t.Minute(), t.Second()) - createWordpressPost.Date = timeString - } else { - errorMessage = errorMessage + "Invalid Date Published" - failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- failedPostResult - return - } - - for _, tag := range wpTagData { - if postBase.Bike == true && tag.Name == "bike" { - createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id) - } - if postBase.Swim == true && tag.Name == "swim" { - createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id) - } - if postBase.Run == true && tag.Name == "run" { - createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id) - } - } - - var wordPressCategoryIds []int - var firstCategoryResult CategoryResult - for _, slowtwitchCategoryId := range postBase.CategoryIds { - categoryResult, err := GetSlowtwitchCategoryResult(slowtwitchCategoryId, migration.ResultsDatabase) - if err != nil { - errorMessage = errorMessage + err.Error() - failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- failedPostResult - return - } - wordPressCategoryIds = append(wordPressCategoryIds, categoryResult.WordpressId) - firstCategoryResult = categoryResult - } - - if len(wordPressCategoryIds) == 0 { - errorMessage = "This post has no categories and is broken on the production site." - failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- failedPostResult - return - } - - createWordpressPost.Categories = wordPressCategoryIds - //Get Author ID - editor, findEditorErr := GetEditor(postBase.Author, postBase.AuthorEmail, migration.ResultsDatabase) - - if findEditorErr != nil { - editor, catchAllEditorErr := GetEditor("admin", "slowman2@slowtwitch.com", migration.ResultsDatabase) - - if catchAllEditorErr != nil { - errorMessage = errorMessage + findEditorErr.Error() - failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- failedPostResult - return - } - createWordpressPost.Author = editor.WordpressId - } - createWordpressPost.Author = editor.WordpressId - //Get old link - oldLink := strings.ReplaceAll(firstCategoryResult.OldUrl, "index.html", "") + slowtwitch.ConvertPostTitleToPath(postBase.Title, postBase.Id) - linkStatus := slowtwitch.GetPageStatus(oldLink) - - if linkStatus == 404 { - errorMessage = errorMessage + "Page not found on Slowtwitch" - failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- failedPostResult - return - } - - //Get page, parse out post data and images - //Upload images to wordpress, swap out with new image urls - //Submit - imagePaths, html, retreiveHtmlErr := slowtwitch.GetImagesAndPostHtml(oldLink) - if retreiveHtmlErr != nil { - errorMessage = errorMessage + retreiveHtmlErr.Error() - failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- failedPostResult - return - } - //Create images from the image paths - var imageResults []ImageResult - - for i, imagePath := range imagePaths { - //construct URL - imageUrl := "https://www.slowtwitch.com" + imagePath - createWordpressImage := wordpress.CreateImage{ - Url: imageUrl, - } - //submit image - wordpressImage, wordpressImageErr := createWordpressImage.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword) - - if wordpressImageErr != nil { - errorMessage = errorMessage + wordpressImageErr.Error() - imageFailureResult := ImageResult{ - OldUrl: imageUrl, - NewUrl: "", - WordpressId: 0, - IsSuccess: false, - } - imageResults = append(imageResults, imageFailureResult) - continue - } - //first photo is the featured photo - if i == 0 { - createWordpressPost.FeaturedMedia = wordpressImage.Id - } - //begin process of recording result - imageResult := ImageResult{ - OldUrl: imageUrl, - NewUrl: wordpressImage.Link, - WordpressId: wordpressImage.Id, - IsSuccess: true, - } - imageResults = append(imageResults, imageResult) - //replace old links with new in post html - newImagePath := "/wp-content/uploads/" + wordpressImage.MediaDetails.File - html = strings.ReplaceAll(html, imagePath, newImagePath) - //create redirect - imageRedirect := wordpress.CreateRedirect{ - Title: postBase.Title + "image-" + string((i + 1)), - Url: imagePath, - MatchType: "page", - ActionType: "url", - ActionCode: 301, - GroupId: 1, - ActionData: wordpress.ActionData{ - Url: "/" + wordpressImage.Slug, - }, - } - - _, imageRedirectErr := imageRedirect.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword) - - if imageRedirectErr != nil { - fmt.Println("Failed to create image redirect:", imageUrl, ":", imageRedirectErr.Error()) - } - } - - createWordpressPost.Content = html - post, createWordpressPostErr := createWordpressPost.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword) - if createWordpressPostErr != nil { - errorMessage = errorMessage + createWordpressPostErr.Error() - postFailureResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase) - postResultsChannel <- postFailureResult - return - } - //set up post result here to create - //truncate error message for db - if len(errorMessage) > 1450 { - errorMessage = errorMessage[:1450] - } - - postResult := PostResult{ - SlowtwitchId: batchId, - WordpressId: post.Id, - OldUrl: oldLink, - OldUrlStatus: linkStatus, - NewUrl: post.Link, - IsSuccess: true, - ErrorMessage: errorMessage, - } - - postResultId, createPostResultErr := CreatePostResult(postResult, migration.ResultsDatabase) - if createPostResultErr != nil { - fmt.Println("Could not record post result for Slowtwitch post:" + strconv.Itoa(batchId) + createPostResultErr.Error()) - } - for _, imageResult := range imageResults { - imageResult.PostId = postResultId - createImageResultErr := CreateImageResult(imageResult, migration.ResultsDatabase) - if createImageResultErr != nil { - fmt.Println("Error recording image result") - } - } - oldPath := strings.ReplaceAll(oldLink, "https://www.slowtwitch.com", "") - postRedirect := wordpress.CreateRedirect{ - Title: "Article Redirect" + postBase.Title, - Url: oldPath, - MatchType: "page", - ActionType: "url", - ActionCode: 301, - GroupId: 1, - ActionData: wordpress.ActionData{ - Url: "/" + strings.ReplaceAll(postResult.NewUrl, migration.WordpressBaseUrl, ""), - }, - } - _, postRedirectErr := postRedirect.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword) - if postRedirectErr != nil { - fmt.Println("Error creating redirect for", batchId, ":"+postRedirectErr.Error()) - } - fmt.Println("Successfully created post and result for", batchId) - updateAcfImages(imageResults, post.Id, migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword) - postResults = append(postResults, postResult) - postResultsChannel <- postResult - }(batchId) + for i := 0; i < batchSize; i++ { + go func() { + for postId := range workInputChannel { + result := createPost(postId, migration, wpTagData) + workOutputChannel <- result + wg.Done() } - //Listen to channel and record results - var batchResults []PostResult - for result := range postResultsChannel { - batchResults = append(batchResults, result) - if len(batchResults) == batchSize { - close(postResultsChannel) - } - } - - postBatch = nil - postResults = append(postResults, batchResults...) - } + }() } - // Update related posts once work is done -- Do for all posts? - allPostResults := GetAllPostResults(migration.ResultsDatabase) - updatePostRelationships(allPostResults, migration) + + go func() { + defer close(workInputChannel) + + for _, postId := range slowtwitchPostIdsForMigration { + workInputChannel <- postId + } + + wg.Done() + }() + + go func() { + for postResult := range workOutputChannel { + postResults = append(postResults, postResult) + } + }() + + wg.Wait() + close(workOutputChannel) + + updatePostRelationships(postResults, migration) return postResults } func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagData) PostResult { + fmt.Println("Creating post " + strconv.Itoa(postId)) errorMessage := "" //migrate postBase, postBaseErr := slowtwitch.GetPostBase(postId, migration.SlowtwitchDatabase) @@ -302,7 +96,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat Excerpt: postBase.Description, Status: "publish", } - + fmt.Println("Checking date: " + strconv.Itoa(postId)) if postBase.DatePublished.Valid { t := postBase.DatePublished.Time timeString := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d", @@ -314,7 +108,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat failedPostResult := createPostFailureResult(postId, errorMessage, migration.ResultsDatabase) return failedPostResult } - + fmt.Println("Checking tags: " + strconv.Itoa(postId)) for _, tag := range wpTagData { if postBase.Bike == true && tag.Name == "bike" { createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id) @@ -326,7 +120,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat createWordpressPost.Tags = append(createWordpressPost.Tags, tag.Id) } } - + fmt.Println("Checking categories: " + strconv.Itoa(postId)) var wordPressCategoryIds []int var firstCategoryResult CategoryResult for _, slowtwitchCategoryId := range postBase.CategoryIds { @@ -349,7 +143,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat createWordpressPost.Categories = wordPressCategoryIds //Get Author ID editor, findEditorErr := GetEditor(postBase.Author, postBase.AuthorEmail, migration.ResultsDatabase) - + fmt.Println("Finding editor: " + strconv.Itoa(postId)) if findEditorErr != nil { editor, catchAllEditorErr := GetEditor("admin", "slowman2@slowtwitch.com", migration.ResultsDatabase) @@ -364,7 +158,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat //Get old link oldLink := strings.ReplaceAll(firstCategoryResult.OldUrl, "index.html", "") + slowtwitch.ConvertPostTitleToPath(postBase.Title, postBase.Id) linkStatus := slowtwitch.GetPageStatus(oldLink) - + fmt.Println("Checking link: " + strconv.Itoa(postId)) if linkStatus == 404 { errorMessage = errorMessage + "Page not found on Slowtwitch" failedPostResult := createPostFailureResult(postId, errorMessage, migration.ResultsDatabase) @@ -374,6 +168,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat //Get page, parse out post data and images //Upload images to wordpress, swap out with new image urls //Submit + fmt.Println("Checking images: " + strconv.Itoa(postId)) imagePaths, html, retreiveHtmlErr := slowtwitch.GetImagesAndPostHtml(oldLink) if retreiveHtmlErr != nil { errorMessage = errorMessage + retreiveHtmlErr.Error() @@ -437,7 +232,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat fmt.Println("Failed to create image redirect:", imageUrl, ":", imageRedirectErr.Error()) } } - + fmt.Println("Checking html: " + strconv.Itoa(postId)) createWordpressPost.Content = html post, createWordpressPostErr := createWordpressPost.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword) if createWordpressPostErr != nil { @@ -460,7 +255,7 @@ func createPost(postId int, migration MigratePosts, wpTagData []wordpress.TagDat IsSuccess: true, ErrorMessage: errorMessage, } - + fmt.Println("Recording result: " + strconv.Itoa(postId)) postResultId, createPostResultErr := CreatePostResult(postResult, migration.ResultsDatabase) if createPostResultErr != nil { fmt.Println("Could not record post result for Slowtwitch post:" + strconv.Itoa(postId) + createPostResultErr.Error()) @@ -517,11 +312,12 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) { //as the workers take work from the channel. This will be blocked while //any task is waiting to be picked up from the channel go func() { + defer close(postResultWorkInput) + for _, result := range postResults { postResultWorkInput <- result } - close(postResultWorkInput) wg.Done() }()