threaded update post relationships

This commit is contained in:
Ross Trottier 2024-06-03 16:28:17 -06:00
parent cb6829f175
commit f13c2c939c
6 changed files with 115 additions and 43 deletions

View File

@ -7,15 +7,15 @@ import (
) )
// WP Config // WP Config
const baseUrl = "http://go-api-playground.local/" const baseUrl = "https://slowtwitch.cloud/"
const wordpressKey = "admin" const wordpressKey = "admin@slowtwitch.cloud"
const wordpressSecret = "7ZeY ZCwp 3DwL EBAK vGtf NW9J" const wordpressSecret = "3SrI MysB NREZ 40OD PLAP uk1W"
// DB Config // DB Config
const slowtwitchAdminUser = "admin" const slowtwitchAdminUser = "admin"
const slowtwitchAdminPass = "yxnh93Ybbz2Nm8#mp28zCVv" const slowtwitchAdminPass = "yxnh93Ybbz2Nm8#mp28zCVv"
const slowtwitchDbName = "slowtwitch" const slowtwitchDbName = "slowtwitch"
const migrationDbName = "slowtwitch_transfer_threaded_test" const migrationDbName = "slowtwitch_transfer"
const federatedDbUrl = "slowtwitch.northend.network" const federatedDbUrl = "slowtwitch.northend.network"
const federatedDbPort = "3306" const federatedDbPort = "3306"

View File

@ -18,8 +18,8 @@ func CreateEditorResult(result EditorResult, db *sql.DB) error {
return err return err
} }
func GetEditor(username string, db *sql.DB) (EditorResult, error) { func GetEditor(username, email string, db *sql.DB) (EditorResult, error) {
rows, err := db.Query("select WordpressId, Username, Email, (IsSuccess = b'1') from EditorsResults where Username = ?", username) rows, err := db.Query("select WordpressId, Username, Email, (IsSuccess = b'1') from EditorsResults where Username = ? OR Email = ?", username, email)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
@ -35,7 +35,7 @@ func GetEditor(username string, db *sql.DB) (EditorResult, error) {
fmt.Println(err) fmt.Println(err)
} }
if editor.Username == username { if editor.Username == username || editor.Email == email {
return editor, nil return editor, nil
} }
} }
@ -43,8 +43,8 @@ func GetEditor(username string, db *sql.DB) (EditorResult, error) {
return EditorResult{}, errors.New("Could not find editor.") return EditorResult{}, errors.New("Could not find editor.")
} }
func EditorHasBeenMigrated(username string, db *sql.DB) bool { func EditorHasBeenMigrated(username, email string, db *sql.DB) bool {
_, err := GetEditor(username, db) _, err := GetEditor(username, email, db)
if err == nil { if err == nil {
return true return true

View File

@ -22,7 +22,7 @@ func (migration *MigrateAuthors) Execute() []EditorResult {
var output []EditorResult var output []EditorResult
for _, editor := range editors { for _, editor := range editors {
hasBeenMigrated := EditorHasBeenMigrated(editor.Username, migration.ResultsDatabase) hasBeenMigrated := EditorHasBeenMigrated(editor.Username, editor.Email, migration.ResultsDatabase)
if hasBeenMigrated == false { if hasBeenMigrated == false {
firstName, lastName := getFirstAndLastName(editor.Name) firstName, lastName := getFirstAndLastName(editor.Name)

View File

@ -8,6 +8,7 @@ import (
"slices" "slices"
"strconv" "strconv"
"strings" "strings"
"sync"
) )
type MigratePosts struct { type MigratePosts struct {
@ -114,13 +115,18 @@ func (migration MigratePosts) Execute() []PostResult {
createWordpressPost.Categories = wordPressCategoryIds createWordpressPost.Categories = wordPressCategoryIds
//Get Author ID //Get Author ID
editor, findEditorErr := GetEditor(postBase.Author, migration.ResultsDatabase) editor, findEditorErr := GetEditor(postBase.Author, postBase.AuthorEmail, migration.ResultsDatabase)
if findEditorErr != nil { if findEditorErr != nil {
errorMessage = errorMessage + findEditorErr.Error() editor, catchAllEditorErr := GetEditor("admin", "slowman2@slowtwitch.com", migration.ResultsDatabase)
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult if catchAllEditorErr != nil {
return errorMessage = errorMessage + findEditorErr.Error()
failedPostResult := createPostFailureResult(batchId, errorMessage, migration.ResultsDatabase)
postResultsChannel <- failedPostResult
return
}
createWordpressPost.Author = editor.WordpressId
} }
createWordpressPost.Author = editor.WordpressId createWordpressPost.Author = editor.WordpressId
//Get old link //Get old link
@ -272,45 +278,82 @@ func (migration MigratePosts) Execute() []PostResult {
postResults = append(postResults, batchResults...) postResults = append(postResults, batchResults...)
} }
} }
// Update related posts once work is done -- Why are there duplicates? // Update related posts once work is done -- Do for all posts?
updatePostRelationships(postResults, migration) allPostResults := GetAllPostResults(migration.ResultsDatabase)
updatePostRelationships(allPostResults, migration)
return postResults return postResults
} }
func updatePostRelationships(postResults []PostResult, migration MigratePosts) { func updatePostRelationships(postResults []PostResult, migration MigratePosts) {
fmt.Println("Updating post relationships") fmt.Println("Updating post relationships")
for i, postResult := range postResults { batchSize := 5
fmt.Println("Updating post", i+1, "/", len(postResults)) postResultWorkInput := make(chan PostResult)
if postResult.IsSuccess { //Set up a wait group to wait for the input channel to close after execution
var relatedWordpressIds []int var wg sync.WaitGroup
wg.Add(1)
wg.Add(len(postResults))
relatedSlowtwitchIds, slowtwitchIdsErr := slowtwitch.GetRelatedArticleIds(postResult.SlowtwitchId, migration.SlowtwitchDatabase) //Launch go routines that will read from the input channel,
//maxed out at the batch size
if slowtwitchIdsErr != nil || len(relatedSlowtwitchIds) == 0 { for i := 0; i < batchSize; i++ {
continue go func() {
} for result := range postResultWorkInput {
for _, slowtwitchRelatedId := range relatedSlowtwitchIds { updatePostRelationship(result, migration)
wordpressRelatedId, wordpressIdErr := GetWordpressPostIdBySlowtwitchPostId(slowtwitchRelatedId, migration.ResultsDatabase) wg.Done()
if wordpressIdErr != nil {
continue
}
relatedWordpressIds = append(relatedWordpressIds, wordpressRelatedId)
} }
}()
}
if len(relatedWordpressIds) > 0 { //Launch an unbuffered channel that will fill the channel with work
updateWordpressRelatedPosts := wordpress.UpdateAcfRelatedPosts{ //as the workers take work from the channel. This will be blocked while
Acf: wordpress.AcfRelatedPosts{ //any task is waiting to be picked up from the channel
PostIds: relatedWordpressIds, go func() {
}, for _, result := range postResults {
} postResultWorkInput <- result
updateRelatedPostErr := updateWordpressRelatedPosts.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword, postResult.WordpressId) }
if updateRelatedPostErr != nil {
fmt.Println("Error updating wordpressRelatedPosts", updateRelatedPostErr.Error()) close(postResultWorkInput)
} wg.Done()
}()
wg.Wait()
fmt.Println("Done with post relationships")
}
func updatePostRelationship(postResult PostResult, migration MigratePosts) bool {
if postResult.IsSuccess {
var relatedWordpressIds []int
relatedSlowtwitchIds, slowtwitchIdsErr := slowtwitch.GetRelatedArticleIds(postResult.SlowtwitchId, migration.SlowtwitchDatabase)
if slowtwitchIdsErr != nil || len(relatedSlowtwitchIds) == 0 {
return false
}
for _, slowtwitchRelatedId := range relatedSlowtwitchIds {
wordpressRelatedId, wordpressIdErr := GetWordpressPostIdBySlowtwitchPostId(slowtwitchRelatedId, migration.ResultsDatabase)
if wordpressIdErr != nil {
return false
}
relatedWordpressIds = append(relatedWordpressIds, wordpressRelatedId)
}
if len(relatedWordpressIds) > 0 {
fmt.Println("Updating post in WP", postResult.WordpressId)
updateWordpressRelatedPosts := wordpress.UpdateAcfRelatedPosts{
Acf: wordpress.AcfRelatedPosts{
PostIds: relatedWordpressIds,
},
}
updateRelatedPostErr := updateWordpressRelatedPosts.Execute(migration.WordpressBaseUrl, migration.WordpressUser, migration.WordpressPassword, postResult.WordpressId)
if updateRelatedPostErr != nil {
fmt.Println("Error updating wordpressRelatedPosts", updateRelatedPostErr.Error())
} else {
return true
} }
} }
} }
return false
} }
func getPostIdsThatNeedMigration(slowtwitchPostIds, migratedPostIds []int) []int { func getPostIdsThatNeedMigration(slowtwitchPostIds, migratedPostIds []int) []int {

View File

@ -1,6 +1,9 @@
package migration package migration
import "database/sql" import (
"database/sql"
"log"
)
type PostResult struct { type PostResult struct {
WordpressId int WordpressId int
@ -12,6 +15,28 @@ type PostResult struct {
ErrorMessage string ErrorMessage string
} }
func GetAllPostResults(db *sql.DB) []PostResult {
var results []PostResult
rows, err := db.Query("select WordpressId, SlowtwitchId, OldUrl, OldUrlStatus, NewUrl, (IsSuccess = b'1'), ErrorMessage from PostResults")
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
result := PostResult{}
err = rows.Scan(&result.WordpressId, &result.SlowtwitchId, &result.OldUrl, &result.OldUrlStatus, &result.NewUrl, &result.IsSuccess, &result.ErrorMessage)
if err != nil {
log.Fatal(err)
}
results = append(results, result)
}
return results
}
func CreatePostResult(parameters PostResult, db *sql.DB) (int, error) { func CreatePostResult(parameters PostResult, db *sql.DB) (int, error) {
result, err := db.Exec("insert into PostResults (WordpressId, SlowtwitchId, OldUrl, OldUrlStatus, NewUrl, IsSuccess, ErrorMessage) values (?, ?, ?, ?, ?, ?, ?)", parameters.WordpressId, parameters.SlowtwitchId, parameters.OldUrl, parameters.OldUrlStatus, parameters.NewUrl, parameters.IsSuccess, parameters.ErrorMessage) result, err := db.Exec("insert into PostResults (WordpressId, SlowtwitchId, OldUrl, OldUrlStatus, NewUrl, IsSuccess, ErrorMessage) values (?, ?, ?, ?, ?, ?, ?)", parameters.WordpressId, parameters.SlowtwitchId, parameters.OldUrl, parameters.OldUrlStatus, parameters.NewUrl, parameters.IsSuccess, parameters.ErrorMessage)
if err != nil { if err != nil {

View File

@ -28,5 +28,9 @@ func convert(path string) string {
output = strings.ReplaceAll(output, "?", "") output = strings.ReplaceAll(output, "?", "")
output = strings.ReplaceAll(output, ",", "") output = strings.ReplaceAll(output, ",", "")
output = strings.ReplaceAll(output, "%", "%25") output = strings.ReplaceAll(output, "%", "%25")
output = strings.ReplaceAll(output, "_&", "")
output = strings.ReplaceAll(output, "&", "")
output = strings.ReplaceAll(output, "#", "")
output = strings.ReplaceAll(output, ";", "")
return output return output
} }