From ac888102d188af338f45e6136948364593a1a1c0 Mon Sep 17 00:00:00 2001 From: TeemoKill <1158898301@qq.com> Date: Thu, 22 Dec 2022 18:33:18 +0800 Subject: [PATCH] added live room re-open cool-down check to avoid spam messages after a live room stops streaming but bilibili api result is unstable in short period --- lsp/bilibili/concern.go | 9 ++- lsp/bilibili/concern_fresher.go | 126 +++++++++++++++++++++++--------- lsp/bilibili/model.go | 12 ++- 3 files changed, 104 insertions(+), 43 deletions(-) diff --git a/lsp/bilibili/concern.go b/lsp/bilibili/concern.go index 3a641106..c5a7b534 100644 --- a/lsp/bilibili/concern.go +++ b/lsp/bilibili/concern.go @@ -2,6 +2,10 @@ package bilibili import ( "fmt" + "strings" + "sync" + "time" + localdb "github.com/Sora233/DDBOT/lsp/buntdb" "github.com/Sora233/DDBOT/lsp/cfg" "github.com/Sora233/DDBOT/lsp/concern" @@ -9,13 +13,11 @@ import ( "github.com/Sora233/DDBOT/lsp/mmsg" localutils "github.com/Sora233/DDBOT/utils" "github.com/Sora233/DDBOT/utils/expirable" + "github.com/Sora233/MiraiGo-Template/config" "github.com/Sora233/MiraiGo-Template/utils" "github.com/tidwall/buntdb" "go.uber.org/atomic" - "strings" - "sync" - "time" ) var logger = utils.GetModuleLogger("bilibili-concern") @@ -337,6 +339,7 @@ func (c *Concern) FindUser(mid int64, load bool) (*UserInfo, error) { resp.GetData().GetLiveRoom().GetTitle(), resp.GetData().GetLiveRoom().GetCover(), resp.GetData().GetLiveRoom().GetLiveStatus(), + time.Now().Unix(), ) // AddLiveInfo 会顺便添加UserInfo err = c.StateManager.AddLiveInfo(newLiveInfo) diff --git a/lsp/bilibili/concern_fresher.go b/lsp/bilibili/concern_fresher.go index 294868e9..fd187d44 100644 --- a/lsp/bilibili/concern_fresher.go +++ b/lsp/bilibili/concern_fresher.go @@ -3,23 +3,26 @@ package bilibili import ( "context" "fmt" + "strconv" + "strings" + "time" + "github.com/Sora233/DDBOT/lsp/cfg" "github.com/Sora233/DDBOT/lsp/concern" "github.com/Sora233/DDBOT/lsp/concern_type" + "github.com/Sora233/MiraiGo-Template/config" "github.com/sirupsen/logrus" "github.com/tidwall/buntdb" "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "strconv" - "strings" - "time" ) // fresh 这个fresh不能启动多个 func (c *Concern) fresh() concern.FreshFunc { return func(ctx context.Context, eventChan chan<- concern.Event) { - t := time.NewTimer(time.Second * 3) + timer := time.NewTimer(time.Second * 3) + var interval time.Duration if config.GlobalConfig != nil { interval = config.GlobalConfig.GetDuration("bilibili.interval") @@ -27,16 +30,19 @@ func (c *Concern) fresh() concern.FreshFunc { if interval == 0 { interval = time.Second * 20 } + var freshCount atomic.Int32 if !cfg.GetBilibiliOnlyOnlineNotify() { freshCount.Store(1000) } + for { select { - case <-t.C: + case <-timer.C: case <-ctx.Done(): return } + start := time.Now() var errGroup errgroup.Group @@ -67,6 +73,7 @@ func (c *Concern) fresh() concern.FreshFunc { logger.Errorf("freshLive error %v", err) return err } + // liveInfoMap内是所有正在直播的列表,没有直播的不应该放进去 var liveInfoMap = make(map[int64]*LiveInfo) for _, info := range liveInfo { @@ -81,6 +88,7 @@ func (c *Concern) fresh() concern.FreshFunc { logger.Errorf("ListConcernState error %v", err) return err } + ids, types, err = c.GroupTypeById(ids, types) if err != nil { logger.Errorf("GroupTypeById error %v", err) @@ -91,7 +99,7 @@ func (c *Concern) fresh() concern.FreshFunc { addLiveInfoErr := c.AddLiveInfo(info) if addLiveInfoErr != nil { // 如果因为系统原因add失败,会造成重复推送 - // 按照ddbot的原则,选择不推送,而非重复推送 + // 按照ddbot 的原则,选择不推送,而非重复推送 logger.WithField("mid", info.Mid).Errorf("add live info error %v", err) return } @@ -104,6 +112,7 @@ func (c *Concern) fresh() concern.FreshFunc { selfUid := accountUid.Load() for _, id := range ids { mid := id.(int64) + if selfUid != 0 && selfUid == mid { // 特殊处理下关注自己 accResp, err := XSpaceAccInfo(selfUid) @@ -117,30 +126,43 @@ func (c *Concern) fresh() concern.FreshFunc { liveRoom.GetTitle(), liveRoom.GetCover(), liveRoom.GetLiveStatus(), + time.Now().Unix(), ) if selfLiveInfo.Living() { liveInfoMap[selfUid] = selfLiveInfo } } + oldInfo, _ := c.GetLiveInfo(mid) if oldInfo == nil { // first live info - if newInfo, found := liveInfoMap[mid]; found { + newInfo, found := liveInfoMap[mid] + if found { newInfo.liveStatusChanged = true sendLiveInfo(newInfo) } continue } + if oldInfo.Status == LiveStatus_NoLiving { - if newInfo, found := liveInfoMap[mid]; found { + newInfo, found := liveInfoMap[mid] + if found { // notliving -> living + if time.Duration(newInfo.TimeStamp-oldInfo.TimeStamp) < time.Minute { + // to avoid bilibili live status api unstable issue + // we assume a live can only re-open after closed for at least 1 minute + continue + } + newInfo.liveStatusChanged = true sendLiveInfo(newInfo) } } else if oldInfo.Status == LiveStatus_Living { - if newInfo, found := liveInfoMap[mid]; !found { + newInfo, found := liveInfoMap[mid] + if !found { // living -> notliving - if count := c.IncNotLiveCount(mid); count < 3 { + count := c.IncNotLiveCount(mid) + if count < 3 { logger.WithField("uid", mid).WithField("name", oldInfo.UserInfo.Name). WithField("notlive_count", count). Trace("notlive counting") @@ -149,28 +171,38 @@ func (c *Concern) fresh() concern.FreshFunc { logger.WithField("uid", mid).WithField("name", oldInfo.UserInfo.Name). Debug("notlive count done, notlive confirmed") } - if err := c.ClearNotLiveCount(mid); err != nil { + err := c.ClearNotLiveCount(mid) + if err != nil { logger.WithField("uid", mid).WithField("name", oldInfo.UserInfo.Name). Errorf("clear notlive count error %v", err) } + resp, err := XSpaceAccInfo(mid) if err != nil { logger.WithField("uid", mid).WithField("name", oldInfo.UserInfo.Name). Errorf("XSpaceAccInfo error %v", err) continue } + if resp.GetData().GetLiveRoom().GetLiveStatus() == LiveStatus_Living { continue - } else { - logger.WithField("uid", mid).WithField("name", oldInfo.UserInfo.Name). - Debug("XSpaceAccInfo notlive confirmed") } - newInfo = NewLiveInfo(&oldInfo.UserInfo, resp.GetData().GetLiveRoom().GetTitle(), - resp.GetData().GetLiveRoom().GetCover(), LiveStatus_NoLiving) + logger.WithField("uid", mid). + WithField("name", oldInfo.UserInfo.Name). + Debug("XSpaceAccInfo notlive confirmed") + + newInfo = NewLiveInfo( + &oldInfo.UserInfo, + resp.GetData().GetLiveRoom().GetTitle(), + resp.GetData().GetLiveRoom().GetCover(), + LiveStatus_NoLiving, + time.Now().Unix(), + ) newInfo.Name = resp.GetData().GetName() newInfo.liveStatusChanged = true sendLiveInfo(newInfo) } else { + // still living but title changed if newInfo.LiveTitle == "bilibili主播的直播间" { newInfo.LiveTitle = oldInfo.LiveTitle } @@ -197,18 +229,20 @@ func (c *Concern) fresh() concern.FreshFunc { } else { logger.WithField("cost", end.Sub(start)).Errorf("watchCore error %v", err) } - t.Reset(interval) + timer.Reset(interval) } } } func (c *Concern) freshDynamicNew() ([]*NewsInfo, error) { var start = time.Now() + resp, err := DynamicSvrDynamicNew() if err != nil { logger.Errorf("DynamicSvrDynamicNew error %v", err) return nil, err } + var newsMap = make(map[int64][]*Card) if resp.GetCode() != 0 { logger.WithField("RespCode", resp.GetCode()). @@ -216,6 +250,7 @@ func (c *Concern) freshDynamicNew() ([]*NewsInfo, error) { Errorf("DynamicSvrDynamicNew failed") return nil, fmt.Errorf("DynamicSvrDynamicNew failed %v - %v", resp.GetCode(), resp.GetMessage()) } + var cards []*Card cards = append(cards, resp.GetData().GetCards()...) // 尝试刷一下历史动态,看看能不能捞一下被审核的动态 @@ -289,8 +324,9 @@ func (c *Concern) freshDynamicNew() ([]*NewsInfo, error) { // return all LiveInfo in LiveStatus_Living func (c *Concern) freshLive() ([]*LiveInfo, error) { var start = time.Now() + var liveInfo []*LiveInfo - var infoSet = make(map[int64]bool) + var visited = make(map[int64]bool) var page = 1 var maxPage int32 = 1 var zeroCount = 0 @@ -299,24 +335,31 @@ func (c *Concern) freshLive() ([]*LiveInfo, error) { if err != nil { logger.Errorf("freshLive FeedList error %v", err) return nil, err - } else if resp.GetCode() != 0 { - if resp.GetCode() == -101 && strings.Contains(resp.GetMessage(), "未登录") { - logger.Errorf("刷新直播列表失败,可能是cookie失效,将尝试重新获取cookie") - ClearCookieInfo(username) - atomicVerifyInfo.Store(new(VerifyInfo)) - } else if resp.GetCode() == -400 { - logger.Errorf("刷新直播列表失败,可能是自动登陆失败,请查看文档尝试手动设置b站cookie") - } else { - logger.Errorf("freshLive FeedList code %v msg %v", resp.GetCode(), resp.GetMessage()) - } + } + + switch { + case resp.GetCode() == 0: + // no error, do nothing + case resp.GetCode() == -101 && strings.Contains(resp.GetMessage(), "未登录"): + logger.Errorf("刷新直播列表失败,可能是cookie失效,将尝试重新获取cookie") + ClearCookieInfo(username) + atomicVerifyInfo.Store(new(VerifyInfo)) + return nil, fmt.Errorf("freshLive FeedList error code %v msg %v", resp.GetCode(), resp.GetMessage()) + case resp.GetCode() == -400: + logger.Errorf("刷新直播列表失败,可能是自动登陆失败,请查看文档尝试手动设置b站cookie") + return nil, fmt.Errorf("freshLive FeedList error code %v msg %v", resp.GetCode(), resp.GetMessage()) + default: + logger.Errorf("freshLive FeedList code %v msg %v", resp.GetCode(), resp.GetMessage()) return nil, fmt.Errorf("freshLive FeedList error code %v msg %v", resp.GetCode(), resp.GetMessage()) } + var ( dataSize = len(resp.GetData().GetList()) pageSize, _ = strconv.ParseInt(resp.GetData().GetPagesize(), 10, 32) curTotal = resp.GetData().GetResults() curMaxPage = (curTotal-1)/int32(pageSize) + 1 ) + logger.WithFields(logrus.Fields{ "CurTotal": curTotal, "PageSize": pageSize, @@ -324,37 +367,45 @@ func (c *Concern) freshLive() ([]*LiveInfo, error) { "maxPage": maxPage, "page": page, }).Trace("freshLive debug") + if curMaxPage > maxPage { maxPage = curMaxPage } - for _, l := range resp.GetData().GetList() { - if infoSet[l.GetUid()] { + + for _, liveData := range resp.GetData().GetList() { + if visited[liveData.GetUid()] { continue } - infoSet[l.GetUid()] = true + visited[liveData.GetUid()] = true + info := NewLiveInfo( - NewUserInfo(l.GetUid(), l.GetRoomid(), l.GetUname(), l.GetLink()), - l.GetTitle(), - l.GetPic(), + NewUserInfo(liveData.GetUid(), liveData.GetRoomid(), liveData.GetUname(), liveData.GetLink()), + liveData.GetTitle(), + liveData.GetPic(), LiveStatus_Living, + time.Now().Unix(), ) if info.Cover == "" { - info.Cover = l.GetCover() + info.Cover = liveData.GetCover() } if info.Cover == "" { - info.Cover = l.GetFace() + info.Cover = liveData.GetFace() } + liveInfo = append(liveInfo, info) } + if dataSize != 0 { zeroCount = 0 page++ } else { zeroCount += 1 } + if int32(page) > maxPage { break } + if zeroCount >= 3 { // 认为是真的无人在直播,可能是关注比较少 if maxPage > 1 { @@ -367,15 +418,18 @@ func (c *Concern) freshLive() ([]*LiveInfo, error) { break } } + ts := time.Now().Unix() for _, info := range liveInfo { _ = c.MarkLatestActive(info.Mid, ts) } + logger.WithFields(logrus.Fields{ "cost": time.Since(start), "Page": page, "MaxPage": maxPage, "LiveInfo Size": len(liveInfo), }).Tracef("freshLive done") + return liveInfo, nil } diff --git a/lsp/bilibili/model.go b/lsp/bilibili/model.go index 9910a484..8e4dd3ed 100644 --- a/lsp/bilibili/model.go +++ b/lsp/bilibili/model.go @@ -1,16 +1,18 @@ package bilibili import ( - "github.com/Mrs4s/MiraiGo/message" + "strings" + "sync" + "github.com/Sora233/DDBOT/lsp/concern_type" "github.com/Sora233/DDBOT/lsp/mmsg" "github.com/Sora233/DDBOT/lsp/template" localutils "github.com/Sora233/DDBOT/utils" "github.com/Sora233/DDBOT/utils/blockCache" + + "github.com/Mrs4s/MiraiGo/message" "github.com/Sora233/MiraiGo-Template/config" "github.com/sirupsen/logrus" - "strings" - "sync" ) type NewsInfo struct { @@ -95,6 +97,7 @@ type LiveInfo struct { Status LiveStatus `json:"status"` LiveTitle string `json:"live_title"` Cover string `json:"cover"` + TimeStamp int64 `json:"time_stamp"` once sync.Once msgCache *mmsg.MSG @@ -187,7 +190,7 @@ func NewUserInfo(mid, roomId int64, name, url string) *UserInfo { } } -func NewLiveInfo(userInfo *UserInfo, liveTitle string, cover string, status LiveStatus) *LiveInfo { +func NewLiveInfo(userInfo *UserInfo, liveTitle string, cover string, status LiveStatus, timeStamp int64) *LiveInfo { if userInfo == nil { return nil } @@ -196,6 +199,7 @@ func NewLiveInfo(userInfo *UserInfo, liveTitle string, cover string, status Live Status: status, LiveTitle: liveTitle, Cover: cover, + TimeStamp: timeStamp, } }