go动态创建/增加channel并处理数据

背景描述

有一个需求,大概可以描述为:有多个websocket连接,因此消息会并发地发送过来,这些消息中有一个标志可以表明是哪个连接发来的消息,但只有收到消息后才能建立channel或写入已有channel,在收消息前无法预先创建channel

解决过程(可直接阅读最终版)

初版:直接写入

因为对数据量错误预估(以为数据量不大),一开始我是用的mysql直接写入,每次收到ws消息立即处理,可测试中发现因数据量过多且都会操作同一行数据,出现了资源竞争,导致死锁。

第二版:增加锁

在发现出现数据竞争后,我第一反应是增加读写锁。读写锁的代码类似以下示例:

package main

import (
	"database/sql"
	"fmt"
	"sync"

	_ "github.com/go-sql-driver/mysql"
)

var (
	db *sql.DB
	mu sync.RWMutex
)

func init() {
	var err error
	db, err = sql.Open("mysql", "username:password@tcp(localhost:3306)/dbname")
	if err != nil {
		panic(err)
	}
}

func main() {
	defer db.Close()

	// 读取数据
	go readData()

	// 写入数据
	go writeData()

	// 保持主线程运行
	select {}
}

func readData() {
	for {
		mu.RLock()
		rows, err := db.Query("SELECT * FROM table_name")
		mu.RUnlock()
		if err != nil {
			fmt.Println("Error reading data:", err)
			continue
		}
		defer rows.Close()

		// 处理查询结果
		// ...

		// 睡眠一段时间,模拟读操作的持续性
		// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑
		// 或使用定时器进行控制
	}
}

func writeData() {
	for {
		mu.Lock()
		_, err := db.Exec("INSERT INTO table_name (column1, column2) VALUES (?, ?)", value1, value2)
		mu.Unlock()
		if err != nil {
			fmt.Println("Error writing data:", err)
			continue
		}

		// 睡眠一段时间,模拟写操作的持续性
		// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑
		// 或使用定时器进行控制
	}
}

但是代码里对数据库的操作非常频繁且混乱,加了读写锁后经常出现请求很慢的情况,考虑其他方案

第三版 使用事务

使用事务代码忽略,最终发现,因为事务过长,导致出现了重复写的问题,考虑其他方案

第四版 map

通过一个二维的map来存储数据,每当数据存满10条就处理,当然毫不意外的,出现了map的竞争。map也是可以用锁的,但是这里是二维的map,加上两层锁之后使得效率极低,而且依旧有概率出现map竞争导致报错

此外,还可以考虑使用redis设置锁,直接set就行了,但是因为环境不支持redis,此方案弃用

最终版 动态channel

出现以上问题的根本原因是消费太快,其实完全可以把每个ws连接的数据都写到各自的channel里,同时设置每个channel都累积10条再消费,当然还需要一个处理机制,如果超过10s也消费一次。

启动"生产者"、“消费者”

在当前环境中,生产者就是每次从ws中读到数据往动态channel中写入,消费者就是不断获取有哪些channel,以及从channel中读数据,在ws写入时的处理逻辑大概可以简化为如下demo:

package test

import (
	"context"
	"encoding/json"
	"github.com/gin-gonic/gin"
	"github.com/gorilla/websocket"
	log "github.com/sirupsen/logrus"
	"net/http"
	"sync"
)

// RequestTemplate 请求模板
type RequestTemplate struct {
	Op   string               `json:"op"`   // 操作
	Id   int                  `json:"id"`   // 唯一id标识
	Time string               `json:"time"` // 时间,用秒级时间戳,字符串包裹
	Data *RequestTemplateData `json:"data"` // 请求数据
	Code int                  `json:"code"` // 状态码
}

// RequestTemplateData 请求中data包含的部分,实际这里是很复杂的结构,之前超时/死锁也是因为这里处理逻辑比较复杂,但是这篇博客的演示重点不是这个,因此简略为id和请求ip
type RequestTemplateData struct {
	ConnIp string `json:"conn_ip"` // 请求ip
	Id     int    `json:"id"`      // 唯一id标识
}

// ConnInfo 具体的连接信息
type ConnInfo struct {
	Conn      *websocket.Conn    `json:"conn"`   // websocket连接
	Ctx       context.Context    `json:"ctx"`    // 连接上下文
	CtxCancel context.CancelFunc `json:"cancel"` // 连接上下文cancel function
	Ip        string             `json:"ip"`     // 连接的手机端ip
	Id        int                `json:"id"`     // 唯一id标识
}

var AllConns = make(map[string]*ConnInfo) //创建字典集合存储连接信息

// Start 启动
func Start() {
	//处理ws的连接
	http.HandleFunc("/ws", HandleMsg)

	// //监听7001端口号,作为websocket连接的服务
	log.Info("Server started on :7001")
	log.Fatal(http.ListenAndServe(":7001", nil))
}

// ChannelStorage channel数据
type ChannelStorage struct {
	sync.RWMutex
	channels map[string]chan *RequestTemplateData
}

var ConnRequestData map[int]*RequestTemplateData

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

// HandleMsg 处理ws连接,每来一个新客户端请求就建立一个新连接
func HandleMsg(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil) // 协议升级,这里也可以直连
	if err != nil {
		log.Error(err)
		return
	}
	//获取连接ip,这里是为了区分每个连接
	connIp := conn.RemoteAddr().String()
	// 这里是为了后续关闭channel
	rootCtx := context.Background()
	ctx, cancel := context.WithCancel(rootCtx)
	//加入连接
	AllConns[connIp] = &ConnInfo{
		Conn:      conn,   // 客户端ws链接对象
		Ctx:       ctx,    // 连接上下文
		CtxCancel: cancel, // 取消连接上下文
	}

	defer func() {
		// 如果断开连接,删除数据
		if AllConns[connIp] != nil {
			AllConns[connIp].CtxCancel()
			delete(ConnRequestData, AllConns[connIp].Id)
			go SetDoneData(AllConns[connIp].Id, conn) // 这里对结束做处理
		}
		delete(AllConns, conn.RemoteAddr().String())
		err = conn.Close()
		if err != nil {
			return
		}
		log.Error("HandleMsg异常,开始defer处理:", err)
		if err := recover(); err != nil {
			log.Error("websocket连接异常,已断开:", err)
		}
	}()

	log.WithFields(log.Fields{
		"connIp": connIp,
	}).Info("沙箱已连接")
	reqCh := &ChannelStorage{}
	go reqCh.ResultConsumer(ctx) // 这里是消费者
	//循环读取ws客户端的消息
	for {
		// 读取消息
		_, msg, err := conn.ReadMessage()
		if err != nil {
			log.WithFields(log.Fields{
				"connIp": connIp,
			}).WithError(err).Error("读取websocket的消息失败")
			if AllConns[connIp] != nil {
				delete(ConnRequestData, AllConns[connIp].Id)
				go SetDoneData(AllConns[connIp].Id, conn) // 连接断开设置状态为结束
			}
			// 断开ws连接
			conn.Close()
			delete(AllConns, conn.RemoteAddr().String())
			return
		}

		//msg []byte转string
		msgStr := string(msg)
		log.Info("收到消息为:", msgStr)

		//反序列化消息为结构体
		requestData := RequestTemplate{}
		if err := json.Unmarshal(msg, &requestData); err != nil {
			conn.WriteJSON(gin.H{"sandbox_id": "位置", "cmd": "未知", "error": "cmd通信的请求参数有误,无法json decode"})
			log.Error("json_decode cmd命令的请求参数时出错:", err)
			continue
		}
		dataInfo := requestData.Data
		// 这里实际上有很多操作,简写为两种
		if requestData.Op != "" {
			switch requestData.Op {
			// 收到报告
			case "report":
				go reqCh.Produce(dataInfo) // "生产者",发送一条消息
			// 已完成
			case "done":
				go CheckDone(dataInfo, conn) // 做完成的处理
			default:
				log.Error("未识别的命令:", msgStr)
			}
		}
	}
}

有一个for循环在持续监听ws消息,消费者只启动一次,这里重点就是生产和消费如何实现

“生产者”

“生产者”要做的事就是:
1 每当收到ws消息后,解析,拿到唯一id(这个唯一是指这个连接下的所有上报消息的id都是相同的)
2 判断这个“唯一id”是否已经创建了channel,若创建了则不需要创建,直接写入channel,若未创建则新建channel
以下是生产者的demo:

// GetChannel 获取通道
func (cs *ChannelStorage) GetChannel(key string) chan *RequestTemplateData {
	cs.RLock()
	defer cs.RUnlock()
	return cs.channels[key]
}

// CreateChannel 创建通道并存储到 map 中
func (cs *ChannelStorage) CreateChannel(key string) chan *RequestTemplateData {
	cs.Lock()
	defer cs.Unlock()
	if cs.channels == nil {
		cs.channels = make(map[string]chan *RequestTemplateData, 800)
	}
	ch := make(chan *RequestTemplateData, 10)
	cs.channels[key] = ch
	return ch
}

// Produce 往上报channel中写数据
func (cs *ChannelStorage) Produce(requestData *RequestTemplateData) {
	defer func() {
		if err := recover(); err != nil {
			log.Info("_____________recover CaseResultAdd error________: ", err)
		}
	}()
	// 创建存储通道的结构体实例
	chanelKey := strconv.Itoa(requestData.Id)
	channel := cs.GetChannel(chanelKey)
	if channel == nil {
		channel = cs.CreateChannel(chanelKey)
	}
	// 直接往channel里面塞
	if channel != nil {
		channel <- requestData
	}
}
消费者

消费者由于只启动一次,但后续可能会有新的channel,因此需要增加一个获取所有连接的方法:
消费者demo:

func (cs *ChannelStorage) ResultConsumer(ctx context.Context) {
	defer func() {
		if err := recover(); err != nil {
			log.Info("_____________recover CaseResultConsumer error________: ", err)
		}
	}()

	for {
		select {
		case <-ctx.Done():
			log.Info("websocket断开连接,消费者协程退出...")
			return
		default:
			cs.processAllChannels(ctx)  // 传入 context.Context
			time.Sleep(2 * time.Second) // 控制处理频率
		}
	}
}

// processAllChannels 获取所有channel
func (cs *ChannelStorage) processAllChannels(ctx context.Context) {
	cs.RLock()
	defer cs.RUnlock()

	var wg sync.WaitGroup // 用于等待所有通道处理完毕
	for chName, channel := range cs.channels {
		wg.Add(1)
		go func(chName string, channel chan *RequestTemplateData) {
			defer wg.Done()
			cs.processChannel(chName, channel, ctx)
		}(chName, channel)
	}
	wg.Wait() // 等待所有通道处理完毕
}
func (cs *ChannelStorage) processChannel(chName string, channel chan *RequestTemplateData, ctx context.Context) {
	const batchSize = 10 // 每次处理的数据量

	var messages []*RequestTemplateData
	targetMsgOverTime := 10 * time.Second // 超时时间
	for {
		select {
		case caseMsg := <-channel:
			messages = append(messages, caseMsg) // 将接收到的消息放入 messages 切片中
			if len(messages) == batchSize {
				tmpMessages := messages
				messages = nil
				processMessages(tmpMessages)
			}
		case <-time.After(targetMsgOverTime):
			log.Info("Timeout reached. Processing...")
			if len(messages) > 0 {
				tmpMessages := messages
				messages = nil
				log.Info("Processing remaining messages for channel:", chName)
				processMessages(tmpMessages)
			}
		case <-ctx.Done(): // 如果收到上下文取消信号,退出函数
			log.Info("______________________error__________cancel______")
			return
		}
	}
}

func processMessages(messages []*RequestTemplateData) {
	// 在这里处理消息就是批量的了
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/596145.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java新手必看:快速上手FileOutPutStream类

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

OpenHarmony实战开发-应用侧调用前端页面函数

应用侧可以通过runJavaScript()方法调用前端页面的JavaScript相关函数。 在下面的示例中&#xff0c;点击应用侧的“runJavaScript”按钮时&#xff0c;来触发前端页面的htmlTest()方法。 前端页面代码。 <!-- index.html --> <!DOCTYPE html> <html> <…

58行代码把Llama 3扩展到100万上下文,任何微调版都适用 | 最新快讯

量子位公众号 QbitAI 堂堂开源之王 Llama 3&#xff0c;原版上下文窗口居然只有……8k&#xff0c;让到嘴边的一句“真香”又咽回去了。 在 32k 起步&#xff0c;100k 寻常的今天&#xff0c;这是故意要给开源社区留做贡献的空间吗&#xff1f; 开源社区当然不会放过这个机会&a…

Llama3-Tutorial之LMDeploy高效部署Llama3实践

Llama3-Tutorial之LMDeploy高效部署Llama3实践 Llama 3 近期重磅发布&#xff0c;发布了 8B 和 70B 参数量的模型&#xff0c;lmdeploy团队对 Llama 3 部署进行了光速支持&#xff01;&#xff01;&#xff01; 书生浦语和机智流社区同学光速投稿了 LMDeploy 高效量化部署 Llam…

对于子数组问题的动态规划

前言 先讲讲我对于这个问题的理解吧 当谈到解决子数组问题时&#xff0c;动态规划(DP)是一个强大的工具&#xff0c;它在处理各种算法挑战时发挥着重要作用。动态规划是一种思想&#xff0c;它通过将问题分解成更小的子问题并以一种递归的方式解决它们&#xff0c;然后利用这些…

【华为】IPSec VPN手动配置

【华为】IPSec VPN手动配置 拓扑配置ISP - 2AR1NAT - Easy IPIPSec VPN AR3NATIPsec VPN PC检验 配置文档AR1AR2 拓扑 配置 配置步骤 1、配置IP地址&#xff0c;ISP 路由器用 Lo0 模拟互联网 2、漳州和福州两个出口路由器配置默认路由指向ISP路由器 3、进行 IPsec VPN配置&…

Redission分布式锁 watch dog 看门狗机制

为了避免Redis实现的分布式锁超时&#xff0c;Redisson中引入了watch dog的机制&#xff0c;他可以帮助我们在Redisson实例被关闭前&#xff0c;不断的延长锁的有效期。 自动续租&#xff1a;当一个Redisson客户端实例获取到一个分布式锁时&#xff0c;如果没有指定锁的超时时…

笔记86:关于【#ifndef + #define + #endif】的用法

当你在编写一个头文件&#xff08;例如 pid_controller.h&#xff09;时&#xff0c;你可能会在多个源文件中包含它&#xff0c;以便在这些源文件中使用该头文件定义的函数、类或其他声明。如果你在多个源文件中都包含了同一个头文件&#xff0c;那么当你将整个工程统一编译&am…

银行卡实名认证API接口快速对接

银行卡实名认证API接口又叫银行卡核验类API接口、银行卡验证类API接口、银联核验类API接口,根据入参字段不同&#xff0c;分银行卡二要素验证API接口&#xff0c;银行卡三要素验证API接口&#xff0c;银行卡四要素验证API接口。其中&#xff0c;银行卡二要素验证API接口是验证开…

锂电池SOH估计 | Matlab实现基于ALO-SVR模型的锂电池SOH估计

目录 预测效果基本介绍程序设计参考资料 预测效果 基本介绍 锂电池SOH估计 | Matlab实现基于ALO-SVR模型的锂电池SOH估计 蚁狮优化支持向量机锂电池健康状态SOH估计&#xff1b; 具体流程如下&#xff1b; 1、分析锂离子电池老化数据集&#xff0c;从中选取具有代表电池性能衰减…

【自用】了解移动存储卡的基本信息

前言 本文是看B站视频做的一个简单笔记&#xff0c;方便日后自己快速回顾&#xff0c;内容主要介绍了存储卡基本参数&#xff0c;了解卡面上的数字、图标代表的含义。对于日后如何挑选判断一张存储卡的好坏、判别一张存储卡是否合格有一定帮助。 视频参考链接&#xff1a;【硬…

深入剖析Tomcat(六) Tomcat各组件的生命周期控制

Catalina中有很多组件&#xff0c;像上一章提到的四种容器&#xff0c;载入器&#xff0c;映射器等都是一种组件。每个组件在对外提供服务之前都需要有个启动过程&#xff1b;组件在销毁之前&#xff0c;也需要有个关闭过程&#xff1b;例如servlet容器关闭时&#xff0c;需要调…

OpenNJet应用引擎——云原生时代的Web服务新选择

文章目录 OpenNJet应用引擎——云原生时代的Web服务新选择引言&#xff1a;数字化转型的推动力&#xff1a;OpenNJet应用引擎为什么选择OpenNJet&#xff1f; OpenNJet的核心优势1. 云原生功能增强2. 安全加固3. 代码重构与性能优化4. 动态加载机制5. 多样化的产品形态6. 易于集…

产业空间集聚DO指数计算

1.前言 创始人 :Duranton and Overman&#xff08;2005&#xff09; 目前应用较多的产业集聚度量指数主要基于两类&#xff0c;一是根据不同空间地理单元中产业经济规模的均衡性进行构造&#xff0c;如空间基尼系数与EG指数&#xff1b;二是基于微观企业地理位置信息形成的产业…

嵌入式系统应用-拓展-FLASH之操作 SFUD (Serial Flash Universal Driver)之KEIL应用

这里已经假设SFUD代码已经移植到工程下面成功了&#xff0c;如果读者对SFUD移植还不了解。可以参考笔者这篇文章&#xff1a;SFUD (Serial Flash Universal Driver)之KEIL移植 这里主要介绍测试和应用 1 硬件设计 这里采用windbond 的W25Q32这款芯片用于SFUD测试。 W25Q32是…

LLM⊗KG范式下的知识图谱问答实现框架思想阅读

分享一张有趣的图&#xff0c;意思是在分类场景下&#xff0c;使用大模型和fasttext的效果&#xff0c;评论也很逗。 这其实背后的逻辑是&#xff0c;在类别众多的分类场景下&#xff0c;尤其是在标注数据量不缺的情况下&#xff0c;大模型的收益是否能够比有监督模型的收益更多…

[渗透利器]全能工具=信息收集->漏洞扫描->EXP调用

前言 hxd开发的工具&#xff0c;大致模块有&#xff08;信息收集&#xff0c;漏洞扫描&#xff0c;暴力破解&#xff0c;POC/EXP&#xff0c;常用编码&#xff09; 工具使用 下载后解压 安装环境 pip install -r requirements.txt 注意&#xff0c;该工具继承了两种不同的使…

HTML_CSS学习:定位

一、相对定位 相关代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>相对定位</title><style>.outer{width: 500px;background-color: #999ff0;border: 1px solid #000;p…

OpenHarmony实战开发-上传文件

Web组件支持前端页面选择文件上传功能&#xff0c;应用开发者可以使用onShowFileSelector()接口来处理前端页面文件上传的请求。 下面的示例中&#xff0c;当用户在前端页面点击文件上传按钮&#xff0c;应用侧在onShowFileSelector()接口中收到文件上传请求&#xff0c;在此接…

不考408的985,不想考408的有福了!吉林大学计算机考研考情分析

吉林大学&#xff08;Jilin University&#xff09;简称吉大&#xff0c;位于吉林长春&#xff0c;始建于1946年&#xff0c;是中华人民共和国教育部直属的综合性全国重点大学&#xff0c;国家“双一流”、“211工程”、“985工程”、“2011计划”重点建设的著名学府&#xff0…
最新文章