当前位置: 代码迷 >> 综合 >> Golang Channel和协程 实战读取10万条mysql数据到本地csv
  详细解决方案

Golang Channel和协程 实战读取10万条mysql数据到本地csv

热度:63   发布时间:2023-11-16 22:57:06.0

Go中很多的并发的特性是结合channel去实现的,并且Go的并发很强,但使用了这么久的Go开发,却没有实际开发中使用过高并发,本次呢实战一次go的channel+groutine实现1秒读取mysql数据库到本地。

项目结构:

getdata.go

package getdataimport ("encoding/csv""fmt""goPip/initConfig""os""strconv""sync"
)type Book struct {BookId   int    `json:"book_id" gorm:"book_id"`BookName string `json:"book_name" gorm:"book_name"`
}
type BookList struct {Data []*BookPage int
}
type Result struct {Page intErr  error
}type InputChan chan *BookList //设定与一个别名VideoList类型的chan为InputChan
type OutPutChan chan *Result  //设定一个别名Result类型的chan为outPutChan//返回InputChan(只针对读取数据的函数)
type DataCMD func() InputChan//传入InputChan返回OutputChan(针对管道函数,因为要写入chan,并且返回chan)
type DataPipeCMD func(input InputChan) OutPutChan//传入第一个个方法为获取数据到chan中,第二个之后的是消化chan中的内容
//模拟 linux中的|管道,
func Pipe(cmd1 DataCMD, cs ...DataPipeCMD) OutPutChan {//cmd1执行完,获取到数据data := cmd1()out := make(OutPutChan)wg := sync.WaitGroup{}for _, c := range cs {output := c(data) //遍历上面的dataPipeCMD让每个管道函数去消化拿出来的数据,有可能是很耗时的//上面消化完处理数据的同时。wg.Add(1)//是异步执行的go func(outdata OutPutChan) { //也需要开一个协程,去异步关闭defer wg.Done()for i := range outdata { //不断去获取是不是有数据,有的话就把他放入out中out <- i //把执行的结果,放入out中,并返回}}(output)}go func() {defer close(out) //在wait结束的时候,要关闭这个管道wg.Wait()        //如果放到了外面,则会等待到所有的结果出来,才会return out,外面的进程就会卡死等待,所以需要放到协程中,异步进行等待}()return out //执行的同时,需要先把out返回。因为执行是异步的,全部的通信全部在管道中}const sql = "select * from books order by book_id limit ? offset ? "//获取数据到inputchan,为处理数据做准备,并不需要全部获取完毕,再返回,而是直接先返回,把找数据的操作放到一个协程中, 把找到的数据放入channel中
func ReadData() InputChan {page := 1pagesize := 1000result := make(InputChan)go func() {defer close(result)for {bookList := &BookList{make([]*Book, 0), page}db := initConfig.GetDB().Raw(sql, pagesize, (page-1)*pagesize).Find(&bookList.Data)if db.Error != nil || db.RowsAffected == 0 {break}result <- bookListpage++}}()return result}//管道函数
func WriteData(input InputChan) OutPutChan {out := make(OutPutChan)go func() {defer close(out)for data := range input {out <- &Result{Page: data.Page, Err: SaveData(data)}}}()return out
}//保存csv文件
func SaveData(data *BookList) error {//time.Sleep(time.Millisecond * 500) //假设保存数据是一个耗时的操作file := fmt.Sprintf("csv/%d.csv", data.Page)csvFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)if err != nil {return err}defer csvFile.Close()w := csv.NewWriter(csvFile) //创建一个新的写入文件流header := []string{"title", "coment"}export := [][]string{header,}for _, d := range data.Data {cnt := []string{strconv.Itoa(d.BookId),d.BookName,}export = append(export, cnt)}err = w.WriteAll(export)if err != nil {return err}w.Flush()return nil}func Test() {out := Pipe(ReadData, WriteData,WriteData) //可以写多个WriteData来消化readData产生的数据for res := range out {fmt.Printf("%d.csv文件执行完成,结果%v\n", res.Page, res.Err)}
}

 Dbinit.go

package initConfigimport ("github.com/jinzhu/gorm"_ "github.com/jinzhu/gorm/dialects/mysql""log"
)var db *gorm.DBfunc init() {var err errordb, err = gorm.Open("mysql","root:@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local")if err != nil {log.Fatal(err)}db.SingularTable(true)db.DB().Ping()db.DB().SetMaxIdleConns(10)db.DB().SetMaxOpenConns(20)//db.LogMode(true)
}
func GetDB() *gorm.DB {return db
}

main.go 

package mainimport ("fmt""goPip/getdata""time"
)func main() {//numbers := []int{3, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14start := time.Now().UnixNano() / 1e6getdata.Test()end := time.Now().UnixNano() / 1e6fmt.Printf("测试--用时:%d毫秒\r\n", end-start)
}

只有在保存操作耗时的时候,才能看出来他的效果,比如在保存为csv的时候,增加一个500毫秒的耗时,这样多个处理函数,就能展现出效果了。

因为我感觉,如果在保存这儿不耗时的话,一个处理函数和多个处理函数的实效是一样的。都是由查询的最终耗时,也就是ReadData的耗时决定的。

而在有500毫秒保存耗时的情况下,可以看出效果。
是因为500毫秒是这个执行链路最大的一个耗时,在一个耗时的前提下,如果是多个处理函数在同时处理,那输出的也就是在这500毫秒的时间内所处理完的任务(很快),所以就是显示就是几个几个的一块展示。

10万条数据,稍等发布

  相关解决方案