machine golang 并发工具包使用
  TEZNKK3IfmPf 2023年11月14日 17 0

以前有简单介绍过machine,以下是一个简单的试用(集成了gorunine 以及cron任务)

项目结构

  • 代码结构
├── docker-compose.yaml
├── go.mod
├── go.sum
├── main.go
├── pkg
│   ├── filesync.go
│   └── updatefile.go
└── sql
    └── init.sql
  • 代码说明
    以上是一个简单的基于machine发布订阅以及定时任务能力实现的一个同时数据写入db
  • main.go
    基于machine实现goruntine 的管理,支持PanicRecover,发布订阅以及定时任务
 
package main
import (
    "context"
    "demoapp/pkg"
    "fmt"
    "log"
    "net/http"
    _ "net/http/pprof"
    "time"
    "github.com/autom8ter/machine"
    _ "github.com/go-sql-driver/mysql"
    "github.com/jmoiron/sqlx"
)
func main() {
    db, err := sqlx.Connect("mysql", "root:appdemo@tcp(127.0.0.1:3306)/app?charset=utf8")
    if err != nil {
        log.Fatalln(err)
    }
    defer db.Close()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    m := machine.New(ctx, machine.WithTags("demoapp"), machine.WithMaxRoutines(10),
        machine.WithMiddlewares(machine.PanicRecover()))
    m.Go(func(routine machine.Routine) {
        fmt.Printf("%v |  stats = %s\n", routine.PID(), routine.Machine().Stats().Tags)
        // publish message to channel
        sameFiles, err := pkg.FetchSameFile(db)
        if err != nil {
            log.Println("fetch sameFiles error", err.Error())
            routine.Publish("demo", err)
        }
        if len(sameFiles)%2 == 0 {
            // normal datas
            routine.Publish("demo", sameFiles)
        } else {
            routine.Publish("demo", map[string]interface{}{
                "name": "dlaong",
            })
        }
       // 支持定时任务以及tag 的中间件扩展
    }, machine.GoWithTags("publish"),
        machine.GoWithPID("demoapp"),
        machine.GoWithMiddlewares(
            // run every second until context cancels
            machine.Cron(time.NewTicker(1*time.Second)),
        ))
    m.Go(func(routine machine.Routine) {
        fmt.Printf("one: %v |  stats = %s\n", routine.PID(), routine.Machine().Stats().Tags)
         // 订阅
        routine.Subscribe("demo", func(msg interface{}) {
            switch msg.(type) {
            case error:
                log.Println("print error message")
            case []pkg.SameFile:
                pkg.Update2(db, msg.([]pkg.SameFile))
            default:
                log.Println("don't know", msg)
            }
        })
    })
    m.Go(func(routine machine.Routine) {
         // http seerver 同时进行消息发布
        http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
            routine.Publish("demo", map[string]interface{}{
                "name": "dlaong",
                "type": "from web server",
            })
            fmt.Fprintf(w, "%s userinfo", "dalong")
        })
        http.ListenAndServe(":8080", nil)
    })
    m.Wait()
}

db 操作,很简单,主要是基于sqlx进行的mysql 操作,参考代码
filesync.go:

 
package pkg
import "github.com/jmoiron/sqlx"
// User for db user
type User struct {
    // UID
    UID int `json:"uid" db:"uid"`
}
// SameFile for db SameFile
type SameFile struct {
    // UID
    Parent int64  `json:"parent" db:"parent"`
    Name   string `json:"name" db:"name"`
}
// FetchUser fetch users
func FetchUser(con *sqlx.DB) (users []User, err error) {
    selectSQL := `select distinct uid from user`
    myusers := []User{}
    err = con.Select(&myusers, selectSQL)
    if err != nil {
        return nil, err
    }
    return myusers, nil
}
// FetchSameFile fetch samefile
func FetchSameFile(con *sqlx.DB) (sameFiles []SameFile, err error) {
    selectSQL := `select distinct parent,name from file where size >0 group by parent,name having count(*)>1`
    mysameFile := []SameFile{}
    err = con.Select(&mysameFile, selectSQL)
    if err != nil {
        return nil, err
    }
    return mysameFile, nil
}

updatefile.go

package pkg
import (
    "log"
    "github.com/jmoiron/sqlx"
)
// Update2 update  samefile
func Update2(con *sqlx.DB, sameFiles []SameFile) {
    log.Println("fetch same file counts:", len(sameFiles))
    for _, SameFile := range sameFiles {
        updateSQL := `insert into log2(id,content) values(:id,:content)`
        log.Println("name,parent", SameFile.Name, SameFile.Parent)
        result, err := con.NamedExec(updateSQL, map[string]interface{}{
            "id":      SameFile.Parent,
            "content": SameFile.Name,
        })
        if err != nil {
            log.Println("some wrong:", err.Error())
        }
        effectCount, err := result.RowsAffected()
        if err != nil {
            log.Println("exec  wrong:", err.Error())
        }
        log.Println("exec result:", effectCount)
    }
}
  • 参考火焰图(结合系统查看)

可以方便分析goruntine
machine golang 并发工具包使用

 

 

说明

以上代码比较简单,同时也集成了pprof,我们可以观测goruntine的 行为

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年05月17日   44   0   0 golanghttps
  TEZNKK3IfmPf   2023年11月15日   33   0   0 golang
  TEZNKK3IfmPf   2024年05月31日   24   0   0 后端golang开发语言
  TEZNKK3IfmPf   2023年11月15日   32   0   0 golang
  TEZNKK3IfmPf   2024年05月17日   52   0   0 算法javagolang
TEZNKK3IfmPf