当前位置: 首页 > news >正文

保山市建设局网站登录肇庆seo外包公司

保山市建设局网站登录,肇庆seo外包公司,拉萨建设网站,黄山市建设工程造价管理站网站【Mongoose笔记】MQTT 服务器 简介 Mongoose 笔记系列用于记录学习 Mongoose 的一些内容。 Mongoose 是一个 C/C 的网络库。它为 TCP、UDP、HTTP、WebSocket、MQTT 实现了事件驱动的、非阻塞的 API。 项目地址: https://github.com/cesanta/mongoose学习 下面…

【Mongoose笔记】MQTT 服务器

简介

Mongoose 笔记系列用于记录学习 Mongoose 的一些内容。

Mongoose 是一个 C/C++ 的网络库。它为 TCP、UDP、HTTP、WebSocket、MQTT 实现了事件驱动的、非阻塞的 API。

项目地址:

https://github.com/cesanta/mongoose

学习

下面通过学习 Mongoose 项目代码中的 mqtt-server 示例程序 ,来学习如何使用 Mongoose 实现一个简单的 MQTT 服务器。使用树莓派平台进行开发验证。

mqtt-server 的示例程序不长,代码如下:

// Copyright (c) 2020 Cesanta Software Limited
// All rights reserved
//
// Example MQTT server. Usage:
//  1. Start this server, type `make`
//  2. Install mosquitto MQTT client
//  3. In one terminal, run:   mosquitto_sub -h localhost -t foo -t bar
//  4. In another, run:        mosquitto_pub -h localhost -t foo -m hi#include "mongoose.h"static const char *s_listen_on = "mqtt://0.0.0.0:1883";// A list of subscription, held in memory
struct sub {struct sub *next;struct mg_connection *c;struct mg_str topic;uint8_t qos;
};
static struct sub *s_subs = NULL;// Handle interrupts, like Ctrl-C
static int s_signo;
static void signal_handler(int signo) {s_signo = signo;
}static size_t mg_mqtt_next_topic(struct mg_mqtt_message *msg,struct mg_str *topic, uint8_t *qos,size_t pos) {unsigned char *buf = (unsigned char *) msg->dgram.ptr + pos;size_t new_pos;if (pos >= msg->dgram.len) return 0;topic->len = (size_t) (((unsigned) buf[0]) << 8 | buf[1]);topic->ptr = (char *) buf + 2;new_pos = pos + 2 + topic->len + (qos == NULL ? 0 : 1);if ((size_t) new_pos > msg->dgram.len) return 0;if (qos != NULL) *qos = buf[2 + topic->len];return new_pos;
}size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,uint8_t *qos, size_t pos) {uint8_t tmp;return mg_mqtt_next_topic(msg, topic, qos == NULL ? &tmp : qos, pos);
}size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic,size_t pos) {return mg_mqtt_next_topic(msg, topic, NULL, pos);
}// Event handler function
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {if (ev == MG_EV_MQTT_CMD) {struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;MG_DEBUG(("cmd %d qos %d", mm->cmd, mm->qos));switch (mm->cmd) {case MQTT_CMD_CONNECT: {// Client connectsif (mm->dgram.len < 9) {mg_error(c, "Malformed MQTT frame");} else if (mm->dgram.ptr[8] != 4) {mg_error(c, "Unsupported MQTT version %d", mm->dgram.ptr[8]);} else {uint8_t response[] = {0, 0};mg_mqtt_send_header(c, MQTT_CMD_CONNACK, 0, sizeof(response));mg_send(c, response, sizeof(response));}break;}case MQTT_CMD_SUBSCRIBE: {// Client subscribessize_t pos = 4;  // Initial topic offset, where ID endsuint8_t qos, resp[256];struct mg_str topic;int num_topics = 0;while ((pos = mg_mqtt_next_sub(mm, &topic, &qos, pos)) > 0) {struct sub *sub = calloc(1, sizeof(*sub));sub->c = c;sub->topic = mg_strdup(topic);sub->qos = qos;LIST_ADD_HEAD(struct sub, &s_subs, sub);MG_INFO(("SUB %p [%.*s]", c->fd, (int) sub->topic.len, sub->topic.ptr));// Change '+' to '*' for topic matching using mg_matchfor (size_t i = 0; i < sub->topic.len; i++) {if (sub->topic.ptr[i] == '+') ((char *) sub->topic.ptr)[i] = '*';}resp[num_topics++] = qos;}mg_mqtt_send_header(c, MQTT_CMD_SUBACK, 0, num_topics + 2);uint16_t id = mg_htons(mm->id);mg_send(c, &id, 2);mg_send(c, resp, num_topics);break;}case MQTT_CMD_PUBLISH: {// Client published message. Push to all subscribed channelsMG_INFO(("PUB %p [%.*s] -> [%.*s]", c->fd, (int) mm->data.len,mm->data.ptr, (int) mm->topic.len, mm->topic.ptr));for (struct sub *sub = s_subs; sub != NULL; sub = sub->next) {if (mg_match(mm->topic, sub->topic, NULL)) {mg_mqtt_pub(sub->c, mm->topic, mm->data, 1, false);}}break;}case MQTT_CMD_PINGREQ: {// The server must send a PINGRESP packet in response to a PINGREQ packet [MQTT-3.12.4-1]MG_INFO(("PINGREQ %p -> PINGRESP", c->fd));mg_mqtt_send_header(c, MQTT_CMD_PINGRESP, 0, 0);break;}}} else if (ev == MG_EV_ACCEPT) {// c->is_hexdumping = 1;} else if (ev == MG_EV_CLOSE) {// Client disconnects. Remove from the subscription listfor (struct sub *next, *sub = s_subs; sub != NULL; sub = next) {next = sub->next;if (c != sub->c) continue;MG_INFO(("UNSUB %p [%.*s]", c->fd, (int) sub->topic.len, sub->topic.ptr));LIST_DELETE(struct sub, &s_subs, sub);}}(void) fn_data;
}int main(void) {struct mg_mgr mgr;                // Event managersignal(SIGINT, signal_handler);   // Setup signal handlers - exist eventsignal(SIGTERM, signal_handler);  // manager loop on SIGINT and SIGTERMmg_mgr_init(&mgr);                // Initialise event managerMG_INFO(("Starting on %s", s_listen_on));      // Inform that we're startingmg_mqtt_listen(&mgr, s_listen_on, fn, NULL);   // Create MQTT listenerwhile (s_signo == 0) mg_mgr_poll(&mgr, 1000);  // Event loop, 1s timeoutmg_mgr_free(&mgr);                             // Cleanupreturn 0;
}

下面从main函数开始分析代码。

首先是变量定义。struct mg_mgr是用于保存所有活动连接的事件管理器。

  struct mg_mgr mgr;                // Event manager

设置 signal 函数捕获 SIGINT 信号和 SIGTERM 信号。

  signal(SIGINT, signal_handler);   // Setup signal handlers - exist eventsignal(SIGTERM, signal_handler);  // manager loop on SIGINT and SIGTERM

下面是对应的信号处理函数,当 SIGINT 信号和 SIGTERM 信号到达时,修改 s_signo 的值,使其值不为 0,然后会让主事件循环退出。当用户通过 Ctrl-C 结束进程是会发送 SIGINT 信号,通过 kill 命令不带参数时会发送 SIGTERM 信号。当通过以上两种操作时,都能让主事件循环正常退出。

// Handle interrupts, like Ctrl-C
static int s_signo;
static void signal_handler(int signo) {s_signo = signo;
}

初始化一个事件管理器,也就是将最开始定义的struct mg_mgr变量 mgr 中的数据进行初始化。

  mg_mgr_init(&mgr);                // Initialise event manager

打印出接下来要监听的本地IP地址和端口s_listen_on

  MG_INFO(("Starting on %s", s_listen_on));      // Inform that we're starting

s_listen_on是一个全局变量,默认值为mqtt://0.0.0.0:1883

static const char *s_listen_on = "mqtt://0.0.0.0:1883";

使用mg_mqtt_listen创建一个 MQTT 监听器。s_listen_on是指定要侦听的本地IP地址和端口,fn是事件处理函数。

  mg_mqtt_listen(&mgr, s_listen_on, fn, NULL);   // Create MQTT listener

进行事件循环,mg_mgr_poll 遍历所有连接,接受新连接,发送和接收数据,关闭连接,并为各个事件调用事件处理函数。

  while (s_signo == 0) mg_mgr_poll(&mgr, 1000);  // Event loop, 1s timeout

s_signo 不为 0 时,也就是接收到了退出信号,则结束无限循环,调用 mg_mgr_free 关闭所有连接,释放所有资源。

  mg_mgr_free(&mgr);                             // Cleanup

分析完main函数后,我们看下事件处理函数fn的代码。

判断是否接收到 MG_EV_MQTT_CMD 事件,表示收到 MQTT 命令。

  if (ev == MG_EV_MQTT_CMD) {

将函数参数ev_data转换为 struct mg_mqtt_message,这个结构体用于表示 MQTT 消息。

    struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;

打印收到的命令cmd和服务质量 qos

    MG_DEBUG(("cmd %d qos %d", mm->cmd, mm->qos));

使用switch判断收到的命令cmd是什么。

    switch (mm->cmd) {

如果收到的是MQTT_CMD_CONNECT命令,表示 MQTT 客户端连接到服务器。MQTT 客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是 CONNECT 报文。

      case MQTT_CMD_CONNECT: {// Client connects

判断 MQTT 帧长度是否正确,如果长度小于 9,表示是 MQTT 帧格式不正确。

        if (mm->dgram.len < 9) {mg_error(c, "Malformed MQTT frame");

判断 MQTT 帧头第 8 Byte的数据是否等于 4,这是一个协议级别字节(Protocol Level byte)。对于 3.1.1 版协议,协议级别字段的值是 4(0x04)。如果不等于 4,表示这是一个不支持的 MQTT 版本。

        } else if (mm->dgram.ptr[8] != 4) {mg_error(c, "Unsupported MQTT version %d", mm->dgram.ptr[8]);}

如果 MQTT 的帧正常,则回复 MQTT 客户端。MQTT_CMD_CONNACK确认连接请求,服务端发送 CONNACK 报文响应从客户端收到的 CONNECT 报文。服务端发送给客户端的第一个报文必须是 CONNACK。调用mg_mqtt_send_header发送 MQTT 命令头,固定报头(Fixed header)部分,剩余长度字段为 2。调用mg_send发送可变报头(Variable header)部分,共 2 个 Byte,分别为连接确认标志和连接返回码。连接返回码的值为 0x00 表示连接已接受 。

        } else {uint8_t response[] = {0, 0};mg_mqtt_send_header(c, MQTT_CMD_CONNACK, 0, sizeof(response));mg_send(c, response, sizeof(response));}break;}

如果收到的是MQTT_CMD_SUBSCRIBE命令,表示客户端订阅主题。MQTT 客户端向服务端发送 SUBSCRIBE 报文用于创建一个或多个订阅。

      case MQTT_CMD_SUBSCRIBE: {// Client subscribes

首先定义了一些变量。pos用于指向下一个主题过滤器(Topic Filter)在数据报文中的偏移,初始化为 4 是因为 SUBSCRIBE 报文的固定报头(Fixed header)和可变报头(Variable header)一共占 4 个字节,所以第一个主题过滤器在报文的偏移为 4。qostopic用于从下面的函数mg_mqtt_next_sub中获取服务质量(quality of service)和主题。resp用于后续记录每个主题的服务质量,num_topics用于后续记录主题数量。

        size_t pos = 4;  // Initial topic offset, where ID endsuint8_t qos, resp[256];struct mg_str topic;int num_topics = 0;

通过函数mg_mqtt_next_sub遍历所有的主题。这个函数是在示例程序中实现的。

        while ((pos = mg_mqtt_next_sub(mm, &topic, &qos, pos)) > 0) {

接下来为每个请求的主题创建一个struct sub订阅描述符。

          struct sub *sub = calloc(1, sizeof(*sub));sub->c = c;sub->topic = mg_strdup(topic);sub->qos = qos;

然后将创建的订阅描述符sub添加到订阅列表s_subs中。LIST_ADD_HEAD是一个链表管理宏,用于将sub加入到s_subs中。

          LIST_ADD_HEAD(struct sub, &s_subs, sub);

将所添加的主题打印出来。

          MG_INFO(("SUB %p [%.*s]", c->fd, (int) sub->topic.len, sub->topic.ptr));

将主题中的+改为*,这是为了后续可以使用mg_match进行主题匹配。

          // Change '+' to '*' for topic matching using mg_matchfor (size_t i = 0; i < sub->topic.len; i++) {if (sub->topic.ptr[i] == '+') ((char *) sub->topic.ptr)[i] = '*';}

记录当前主题的服务质量(quality of service)。num_topics记录了主题数量,resp记录了每个主题的服务质量,用于下面回复消息。

          resp[num_topics++] = qos;}

在遍历完了所有主题后,开始回复消息给客户端。服务端发送 SUBACK 报文给客户端,用于确认它已收到并且正在处理 SUBSCRIBE 报文。

使用mg_mqtt_send_header发送 MQTT 命令头,也就是固定报头(Fixed header)部分,报文类型为 SUBACK。然后可变报头为 2 Byte的报文标识符,有效载荷(Payload)部分包含一个返回码(Return Code)列表,每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器(Topic Filter),所以命令头后续的数据长度为num_topics + 2,报文标识符使用idmg_htons用于将uint16_t类型的值转换为网络字节序,返回码(Return Code)部分为resp,长度为num_topics

        mg_mqtt_send_header(c, MQTT_CMD_SUBACK, 0, num_topics + 2);uint16_t id = mg_htons(mm->id);mg_send(c, &id, 2);mg_send(c, resp, num_topics);break;}

接下来看下上面使用的mg_mqtt_next_sub函数是如何实现的。

在函数mg_mqtt_next_sub里面又调用了mg_mqtt_next_topic函数。

size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,uint8_t *qos, size_t pos) {uint8_t tmp;return mg_mqtt_next_topic(msg, topic, qos == NULL ? &tmp : qos, pos);
}

接下来看下mg_mqtt_next_topic函数是如何实现的。

buf是指向下一个主题过滤器(Topic Filter)的位置,其中dgram.ptr表示数据报文,pos是指向下一个主题的偏移。如果pos大于等于数据报文的长度,表示已没有下一个主题了,返回 0。主题过滤器部分,前两个字节表示主题名的长度,然后是主题名,主题名后的一个字节是服务质量要求(Requested QoS)。最后返回下一个题过滤器的偏移。

static size_t mg_mqtt_next_topic(struct mg_mqtt_message *msg,struct mg_str *topic, uint8_t *qos,size_t pos) {unsigned char *buf = (unsigned char *) msg->dgram.ptr + pos;size_t new_pos;if (pos >= msg->dgram.len) return 0;topic->len = (size_t) (((unsigned) buf[0]) << 8 | buf[1]);topic->ptr = (char *) buf + 2;new_pos = pos + 2 + topic->len + (qos == NULL ? 0 : 1);if ((size_t) new_pos > msg->dgram.len) return 0;if (qos != NULL) *qos = buf[2 + topic->len];return new_pos;
}

接下来回到事件处理函数中,来看下一个判断的 MQTT 命令。

如果收到的是MQTT_CMD_PUBLISH命令,表示有客户端发布消息。PUBLISH 控制报文是指从 MQTT 客户端向服务端或者服务端向客户端传输一个应用消息。下面需要将消息推送到所有订阅频道。

      case MQTT_CMD_PUBLISH: {// Client published message. Push to all subscribed channels

将发布的消息和主题打印出来。

        MG_INFO(("PUB %p [%.*s] -> [%.*s]", c->fd, (int) mm->data.len,mm->data.ptr, (int) mm->topic.len, mm->topic.ptr));

遍历整个订阅列表s_subs,通过mg_match比较主题名称。如果主题匹配,则通过函数mg_mqtt_pub发布消息,将消息发送到订阅主题的连接。

        for (struct sub *sub = s_subs; sub != NULL; sub = sub->next) {if (mg_match(mm->topic, sub->topic, NULL)) {mg_mqtt_pub(sub->c, mm->topic, mm->data, 1, false);}}break;}

收到MQTT_CMD_PINGREQ表示有客户端发送心跳请求。客户端发送 PINGREQ 报文给服务端的。用于: 1. 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着。 2. 请求服务端发送 响应确认它还活着。 3. 使用网络以确认网络连接没有断开。

      case MQTT_CMD_PINGREQ: {

服务端必须发送 PINGRESP 报文响应客户端的 PINGREQ 报文。使用mg_mqtt_send_header发送 MQTT 命令头,也就是固定报头部分,报文类型为 PINGRESP。

        // The server must send a PINGRESP packet in response to a PINGREQ packet [MQTT-3.12.4-1]MG_INFO(("PINGREQ %p -> PINGRESP", c->fd));mg_mqtt_send_header(c, MQTT_CMD_PINGRESP, 0, 0);break;}

到这里结束MG_EV_MQTT_CMD事件处理的部分,接下来看其他的事件处理。

判断是否接收到 MG_EV_ACCEPT 事件,这表示已接受连接。

  } else if (ev == MG_EV_ACCEPT) {// c->is_hexdumping = 1;}

判断是否接收到 MG_EV_CLOSE 事件,表示客户端连接已关闭。

当客户端断开连接时,遍历整个订阅列表s_subs,将该客户端的所有订阅删除。其中LIST_DELETE是一个链表管理宏,用于将subs_subs中删除。

  } else if (ev == MG_EV_CLOSE) {// Client disconnects. Remove from the subscription listfor (struct sub *next, *sub = s_subs; sub != NULL; sub = next) {next = sub->next;if (c != sub->c) continue;MG_INFO(("UNSUB %p [%.*s]", c->fd, (int) sub->topic.len, sub->topic.ptr));LIST_DELETE(struct sub, &s_subs, sub);}}

mqtt-server 的示例程序代码就都解析完了,下面实际运行一下 mqtt-server 程序。

打开示例程序,编译并运行:

pi@raspberrypi:~ $ cd Desktop/study/mongoose/examples/mqtt-server/
pi@raspberrypi:~/Desktop/study/mongoose/examples/mqtt-server $ make
cc ../../mongoose.c -I../.. -W -Wall -DMG_ENABLE_LINES=1  -o example main.c
./example 
10e0a1 2 main.c:131:main                Starting on mqtt://0.0.0.0:1883

这个时候我们的 MQTT 服务器就运行起来了,这个时候还需要一个 MQTT 客户端,我们使用 Mongoose 的 mqtt-client 示例程序,并将代码中的 URL 变量s_url修改:

static const char *s_url = "mqtt://localhost:1883";

保存后编译运行程序:

pi@raspberrypi:~/Desktop/study/mongoose/examples/mqtt-client $ make clean all
rm -rf example *.o *.dSYM *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb
cc ../../mongoose.c -I../.. -W -Wall   -o example main.c
./example 
12305b 2 main.c:29:fn                   CREATED
12305d 2 main.c:44:fn                   CONNECTED to mqtt://localhost:1883
12305d 2 main.c:46:fn                   SUBSCRIBED to mg/+/test
12305d 2 main.c:50:fn                   PUBLISHED hello -> mg/clnt/test
12305d 2 main.c:55:fn                   RECEIVED hello <- mg/clnt/test
12305e 2 main.c:58:fn                   CLOSED

可以看到 mqtt-client 示例程序完成了 MQTT 客户端创建,连接,订阅主题mg/+/test,向主题mg/clnt/test发布数据hello,收到所订阅主题mg/clnt/test的数据hello,最后关闭连接。

然后我们来看下 MQTT 服务器这边的日志信息:

pi@raspberrypi:~/Desktop/study/mongoose/examples/mqtt-server $ make
cc ../../mongoose.c -I../.. -W -Wall -DMG_ENABLE_LINES=1  -o example main.c
./example 
10e0a1 2 main.c:131:main                Starting on mqtt://0.0.0.0:1883
12305d 2 main.c:87:fn                   SUB 0x5 [mg/+/test]
12305d 2 main.c:103:fn                  PUB 0x5 [hello] -> [mg/clnt/test]
12305e 2 main.c:119:fn                  UNSUB 0x5 [mg/*/test]

可以看到 MQTT 客户端订阅主题mg/+/test,然后向所有订阅mg/clnt/test主题的客户端发布数据hello,最后断开连接的时候取消订阅。

【参考资料】

examples/mqtt-server

Documentation

examples/mqtt-client

MQTT协议中文版

MQTT Version 3.1.1


本文链接:https://blog.csdn.net/u012028275/article/details/129116209

http://www.mmbaike.com/news/71055.html

相关文章:

  • 用ps设计一个个人网站模板seo内容优化心得
  • 南昌网站建设公司好么长尾词和关键词的区别
  • 做直播平台网站赚钱吗免费的网络营销方式
  • 宠物出售的网站怎么做营销策略有哪些4种
  • 带数据库的网站模板下载抖音关键词排名推广
  • 浦城县规划建设和旅游局网站一个新品牌怎样营销推广
  • 用数据库代码做家乡网站关键词在线播放免费
  • 做竞价网站上海网络推广招聘
  • 手机网站静态模板下载广告投放收费标准
  • 企业网站的建设思维导图企业网站设计图片
  • 深圳定制建站公司电话岳阳seo公司
  • 建设公司网站需要准备什么科目怎么创建网站教程
  • 公司建设网站怎么作账今日国际重大新闻
  • 千山科技做网站好不好网站如何推广运营
  • wordpress 判断是否有文章如何优化网页
  • 网站秒收录如何做好网络推广
  • 响应式网站管理chrome浏览器
  • 优斗士做网站怎么样谷歌推广真有效果吗
  • 幼儿园网站建设策划方案百度快照是什么意思
  • 建设专业网站怎样收费seo引流什么意思
  • h5网站如何做排名网站百度收录要多久
  • 盐城企业网站建设网络运营培训课程
  • 程序员做交友网站尚硅谷培训机构官网
  • 要怎么做自己的网站视频教学宁波seo推广费用
  • 怎么做网站的推广整合营销传播
  • 网站后端技术有哪些seo上海推广公司
  • 建设一个网站需要哪些员工企业seo网络推广
  • 电子商务做网站网站建设公司地址在哪
  • 阜宁网站建设常用的seo工具推荐
  • 微网站开发平台系统软件百度贴吧网页版登录