当前位置: 首页 > news >正文

专题网站建设方案百度图片识别在线识图

专题网站建设方案,百度图片识别在线识图,企业网站组网方案,东莞市专注网站建设怎么样ConcurrentQueue开源库介绍 ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构,适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者,并且提供了多种配置选项来优化性能和内存使用。 Conc…

ConcurrentQueue开源库介绍

ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构,适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者,并且提供了多种配置选项来优化性能和内存使用。

ConcurrentQueue使用

0x01 使用场景说明

我的数据平台在接收到四种不同的业务数据时,需要按数据分类写进RocketMQ。

0x02 自定义类用来存放和区分数据流

  • 设计BusinessFlowMsg
  1. 该类有定义消息的类型
  2. 该类中设计ST_BusinessSign结构体消息头,用来区分消息和获取消息体的长度
  3. BusinessFlowMsg类可以存放的数据长度为512KB
#ifndef BUSINESSFLOWMSG_HPP
#define BUSINESSFLOWMSG_HPP#include <string.h>#define MSG_ROCKETMQ_PNG 0x01
#define MSG_ROCKETMQ_AIS 0x02
#define MSG_ROCKETMQ_ROUTE 0x03
#define MSG_ROCKETMQ_VOYAGE 0x04#pragma pack(push)
#pragma pack(1)// 消息头
typedef struct s_BusinessSign
{int sign; // 业务标识unsigned int length; // 消息体的长度
}ST_BusinessSign;#pragma pack(pop)class BusinessFlowMsg
{
public:BusinessFlowMsg() = default;~BusinessFlowMsg() = default;char* get_data(){return _data;}int data_size(){return businessSign.length;}char* get_body(){return _data + sizeof(ST_BusinessSign);}int body_size(){return data_size() - sizeof(ST_BusinessSign);}ST_BusinessSign* header(){return &businessSign;}bool set_data(const char* data, int length, int sign){if(length > (max_body_len + sizeof(ST_BusinessSign))){return false;}businessSign.sign = sign;businessSign.length = length;memcpy(_data + sizeof(ST_BusinessSign), data, length);return true;}private:enum{max_body_len = 512 * 1024 // 512KB};ST_BusinessSign businessSign;char _data[max_body_len];
};#endif // BUSINESSFLOWMSG_HPP

0x03 创建PngUnit类模拟接到不同的业务数据

  • PngUnit类型创建了四个线程来模拟不同的数据流。
  • PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证明确实如此。
#ifndef PNGUNIT_H
#define PNGUNIT_H#include <thread>
#include <mutex>class PngUnit
{
public:PngUnit();~PngUnit() = default;void start();void sendPNG(int sign);void sendAIS(int sign);void sendRoute(int sign);void sendVoyage(int sign);private:std::thread m_th_png;std::thread m_th_ais;std::thread m_th_route;std::thread m_th_voyage;// std::mutex queue_mutex;  // 互斥锁
};#endif // PNGUNIT_H

#include "pngunit.h"
#include <unistd.h>
#include "rocketmqutils.h"
#include "BusinessFlowMsg.hpp"
#include "json11/json11.hpp"PngUnit::PngUnit()
{}void PngUnit::start()
{// m_th = std::thread([this](){//     sendPNG(100);// });if(m_th_png.joinable()){printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");return;}m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
}void PngUnit::sendPNG(int sign)
{while (true){BusinessFlowMsg pngMsg;const char* pngFile = "1234567890ABCDEF";int fileLen = strlen(pngFile) + 1;pngMsg.set_data(pngFile, fileLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg)){printf("Failed to set PNG message data");}}sleep(1);}
}void PngUnit::sendAIS(int sign)
{json11::Json::object obj = {{"message", "AIS"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg aisMsg;int jsonStrLen = jsonStr.size();aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg)){printf("Failed to set AIS message data");}}sleep(2);}
}void PngUnit::sendRoute(int sign)
{json11::Json::object obj = {{"message", "Route"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg routeMsg;int jsonStrLen = jsonStr.size();routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg)){printf("Failed to set ROUTE message data");}}sleep(3);}
}void PngUnit::sendVoyage(int sign)
{json11::Json::object obj = {{"message", "Voyage"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg voyageMsg;int jsonStrLen = jsonStr.size();voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg)){printf("Failed to set VOYAGE message data");}}sleep(4);}
}

0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ

  • RocketMQUtils类是一个单例类
#ifndef ROCKETMQUTILS_H
#define ROCKETMQUTILS_H#include <thread>
#include <concurrentqueue/moodycamel/concurrentqueue.h>
#include "BusinessFlowMsg.hpp"class RocketMQUtils
{
public:static RocketMQUtils* Instance();private:RocketMQUtils();~RocketMQUtils()=default;RocketMQUtils(const RocketMQUtils &) = delete;RocketMQUtils& operator=(const RocketMQUtils &) = delete;RocketMQUtils(RocketMQUtils &&) = delete;RocketMQUtils& operator=(RocketMQUtils &&) = delete;public:void start();void push();void poll();bool write(char *data, int len, int sign);public:moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;private:static RocketMQUtils* _instance;std::thread _pushThread;
};#endif // ROCKETMQUTILS_H

#include "rocketmqutils.h"
#include <unistd.h>RocketMQUtils * RocketMQUtils::_instance = nullptr;RocketMQUtils *RocketMQUtils::Instance()
{if(_instance == nullptr){_instance = new RocketMQUtils();}return _instance;
}RocketMQUtils::RocketMQUtils()
{}void RocketMQUtils::start()
{if(_pushThread.joinable()){return;}_pushThread = std::thread(&RocketMQUtils::push, this);
}void RocketMQUtils::push()
{while (true){BusinessFlowMsg busiMsg;if(g_businessQueue.try_dequeue(busiMsg)){write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);}else{printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");sleep(2);}}
}void RocketMQUtils::poll()
{}bool RocketMQUtils::write(char *data, int len, int sign)
{std::string msg(data, len);if (sign == MSG_ROCKETMQ_PNG){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_AIS){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_ROUTE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_VOYAGE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else{printf("[%s:%d] data sign error\n", __FILE__, __LINE__);}
}

0x05 使用演示

#include <iostream>
#include <memory>
#include "rocketmqutils.h"
#include "pngunit.h"using namespace std;int main()
{cout << "==Start==" << endl;RocketMQUtils* rocketmq = RocketMQUtils::Instance();rocketmq->start();std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();ptrPngUnit->start();getchar();cout << "==Over==" << endl;return 0;
}

在这里插入图片描述

http://www.mmbaike.com/news/96559.html

相关文章:

  • 被禁止访问网站怎么办郑州品牌网站建设
  • 仅对wordpress自带主题有效白杨seo博客
  • 专注河南网站建设网页设计制作网站素材
  • 旅游预定型网站建设网站优化及推广方案
  • 国外做外汇网站交流抖音seo查询工具
  • 广州微信网站建设价格网络推广员一个月多少钱
  • 个人工作室如何纳税广州seo团队
  • 长春网站制作网页seo网络推广技术员招聘
  • 福步外贸宁波seo深度优化平台
  • 网站建设全套流程外链网站推荐
  • 哈尔滨网站关键词优化排名亚洲足球最新排名
  • 有什么网站可以免费做四六级模拟题湖北百度推广公司
  • 专业做网站方案ppt关键词搜索挖掘爱网站
  • 做字画的网站北京seo关键词排名优化软件
  • 仙桃网站制作seo是什么意思中文
  • 汉川市建设局网站长春网站seo公司
  • 邢台做网站的价格究竟多少钱?南通seo
  • 人妖手术是怎么做的视频网站创意营销
  • 购物网站模块是什么意思网络平台怎么创建
  • 网站建设与实训站长之家怎么找网址
  • 大连做公司网站哪家好网络热词有哪些
  • 备案网站可以做卡盟么交换链接营销
  • 美国购买网站空间产品推广计划
  • 做网站 报价seo实战密码第三版
  • asp网站怎么验证到百度站长网站排名优化化快排优化
  • 交互做的好的网站网络推广方案的内容
  • 铁岭网络推广网站建设免费推广引流软件
  • 淮安做网站洛阳seo网络推广
  • 有什么好的免费网站做教育宣传语网络营销推广方式
  • 重庆做网站电话朝阳区seo