AI智能
改变未来

使用mosquitto函数实现与EMQX的发布和订阅

前面几个博客我们已经将这些基础都已经准备好了,接下来只需要简单的编程就可以实现了
首先我们创建一个结构体

typedef struct broker_sub_s{char    host[64];int     port;char    username[32];char    passwd[64];char    clientid[64];char    subTopic[64];int     subQos;int     keepalive;}broker_sub_t;

这里面我们储存了关于broker的一些信息和连接是需要的数据,存储了地址、端口、用户名、密码、clientid、主题、服务等级、心跳连接的时间。将它们打包成一个结构体,能够让我们再设置回调函数时就有了很大的方便,

订阅

订阅到的信息将保存到下面的结构体

struct mosquitto_message{int mid;//消息序号IDchar *topic; //主题void *payload; //主题内容 ,MQTT 中有效载荷int payloadlen; //消息的长度,单位是字节int qos; //服务质量bool retain; //是否保留消息};

进行订阅的话我们首先要进行mosquitto初始化,然后传讲mosquitto实例,然后调用函数设置用户名和密码,之后我们可以设置对应的回调函数,我这里设置的是connect和message的回调,具体内容可看代码,因为我们传递的信息是json数据格式,所以我还写了个解析cjson的函数,具体可以看看我之前的博客,再connect回调中我们可以用mosquitto_subcrible发布消息(发布端类似),然后在另外一个中打印出信息,最会调用无限循环的函数。具体代码如下:

订阅头文件
/*********************************************************************************      Copyright:  (C) 2020 longyongtu<longyongtu13@qq.com>*                  All rights reserved.**       Filename:  temp_rpt_aly.h*    Description:  This head file**        Version:  1.0.0(17/07/20)*         Author:  longyongtu <longyongtu13@qq.com>*      ChangeLog:  1, Release initial version on \"17/07/20 16:53:49\"*********************************************************************************/#ifndef _MOSQUITTO_SUB_H#define _MOSQUITTO_SUB_H#include <stdio.h>#include <errno.h>#include <unistd.h>#include <libgen.h>#include <getopt.h>#include <string.h>#include <mosquitto.h>#include <stdlib.h>#include \"cJSON.h\"#define  PR_VERSION    \"1.0.0\"#define  HOST          \"192.168.240.134\"#define  PORT          1883#define  USERNAME      \"test_username\"#define  PASSWD        \"test_password\"#define  CLIENTID      \"12345\"#define  SUBTOPIC      \"test\"#define  SUBQOS        1#define  KEEPALIVE     30typedef struct broker_sub_s{char    host[64];int     port;char    username[32];char    passwd[64];char    clientid[64];char    subTopic[64];int     subQos;int     keepalive;}broker_sub_t;static void  print_usage( char *progname);void connect_callback(struct mosquitto *mosq, void *obj, int rc);int sub_init(struct mosquitto ** mosq, broker_sub_t broker, void *buf);void printJson(cJSON * root);void message_callback(struct mosquitto *mosq, void* obj, const struct mosquitto_message *message);#endif
订阅C代码
/**********************************************************************************      Copyright:  (C) 2020 longyongtu<longyongtu13@qq.com>*                  All rights reserved.**       Filename:  temp_rpt_aly.c*    Description:  This file**        Version:  1.0.0(16/07/20)*         Author:  longyongtu <longyongtu13@qq.com>*      ChangeLog:  1, Release initial version on \"16/07/20 14:32:00\"*********************************************************************************/#include \"mosquitto_sub.h\"void printJson(cJSON * root)//以递归的方式打印json的最内层键值对{for(int i=0; i<cJSON_GetArraySize(root); i++)   //遍历最外层json键值对{cJSON * item = cJSON_GetArrayItem(root, i);if(cJSON_Object == item->type)      //如果对应键的值仍为cJSON_Object就递归调用printJsonprintJson(item);else                                //值不为json对象就直接打印出键和值{printf(\"%s:\", item->string);printf(\"%s\\r\\n\", cJSON_Print(item));}}}void connect_callback(struct mosquitto *mosq, void *obj, int rc){broker_sub_t    *broker = (broker_sub_t*)obj;if(!rc){if( mosquitto_subscribe(mosq, NULL, broker->subTopic, 0) != MOSQ_ERR_SUCCESS ){printf(\"Mosq_subcrible() error: %s\\n\", strerror(errno));return ;}printf(\"subicrible topic:%s\\n\", broker->subTopic);return;}return;}#if 0struct mosquitto_message{int mid;//消息序号IDchar *topic; //主题void *payload; //主题内容 ,MQTT 中有效载荷int payloadlen; //消息的长度,单位是字节int qos; //服务质量bool retain; //是否保留消息};#endifvoid message_callback(struct mosquitto *mosq, void* obj, const struct mosquitto_message *message){cJSON    *buf = NULL;printf(\"subcrible topic is %s\\n\", message->topic);printf(\"message:  \\n\");buf=cJSON_Parse(message->payload);printJson(buf);cJSON_Delete(buf);return;}int main(int argc, char *argv[]){broker_sub_t        broker;int                 rv = -1;struct mosquitto    *mosq = NULL;strncpy(broker.host, HOST, sizeof(broker.host));broker.port = PORT;strncpy(broker.username, USERNAME, sizeof(broker.username));strncpy(broker.passwd, PASSWD, sizeof(broker.passwd));strncpy(broker.clientid, CLIENTID, sizeof(broker.clientid));strncpy(broker.subTopic, SUBTOPIC, sizeof(broker.subTopic));broker.subQos = SUBQOS;broker.keepalive = KEEPALIVE;sub_init(&mosq, broker, (void *)&broker);if( !rv ){return -1;}/* Set the subscribe callback.  This is called when the broker responds to a subscription request */mosquitto_message_callback_set(mosq, message_callback);/* Set the connect callback.  This is called when the broker sends a CONNACK message in response to aconnection */mosquitto_connect_callback_set(mosq, connect_callback);rv=mosquitto_connect(mosq, broker.host, broker.port, broker.keepalive);if( rv != MOSQ_ERR_SUCCESS){printf(\"client connect broker failure\\n\");mosquitto_destroy(mosq);//Use to free memory associated with a mosquitto client instance.mosquitto_lib_cleanup();//Call to free resources associated with the library.return -1;}rv=mosquitto_loop_forever(mosq, 11000, 1);if( rv != MOSQ_ERR_SUCCESS ){printf(\"%s\\n\", strerror(errno));}return 0;}int sub_init(struct mosquitto ** mosq, broker_sub_t broker, void *buf){int  rv=-1;//struct mosquitto *mosq1 = NULL;/* Must be called before any other mosquitto functions. not thread safe */mosquitto_lib_init();/* Create a new mosquitto client instance */*mosq=mosquitto_new(broker.clientid, true, buf);if( !*mosq ){printf(\"creat a mosquitto instance errno\\n\");mosquitto_lib_cleanup();return -1;}#if 1rv=mosquitto_username_pw_set(*mosq, broker.username, broker.passwd);if( rv!=MOSQ_ERR_SUCCESS ){printf(\"set username ande password failure\\n\");mosquitto_destroy(*mosq);//Use to free memory associated with a mosquitto client instance.mosquitto_lib_cleanup();//Call to free resources associated with the library.return -1;}#endifreturn 0;}

发布

发布的代码编写和订阅端差不多,

发布头文件
/*********************************************************************************      Copyright:  (C) 2020 longyongtu<longyongtu13@qq.com>*                  All rights reserved.**       Filename:  temp_rpt_aly.h*    Description:  This head file**        Version:  1.0.0(17/07/20)*         Author:  longyongtu <longyongtu13@qq.com>*      ChangeLog:  1, Release initial version on \"17/07/20 16:53:49\"*********************************************************************************/#ifndef _MOSQUITTO_PUB_H#define _MOSQUITTO_PUB_H#include <stdio.h>#include <errno.h>#include <unistd.h>#include <libgen.h>#include <getopt.h>#include <string.h>#include <mosquitto.h>#include <stdlib.h>#include \"sht30.h\"#include \"cJSON.h\"#define  PR_VERSION    \"1.0.0\"#define  HOST          \"192.168.240.134\"#define  PORT          1883#define  USERNAME      \"test_username\"#define  PASSWD        \"test_password\"#define  CLIENTID      \"12345\"#define  PUBTOPIC      \"test\"#define  PUBQOS        1#define  KEEPALIVE     60#define  METHOD        \"thing.event.property.post\"#define  ID            \"232424235324\"#define  VERSION       \"1.0.0\"typedef struct broker_s{char    host[64];int     port;char    username[32];char    passwd[64];char    clientid[64];char    pubTopic[64];int     pubQos;int     keepalive;char    method[32];char    id[16];char    version[16];}broker_t;char *get_temp_hmti_cjson(broker_t *broker);static void  print_usage( char *progname);void callback(struct mosquitto *mosq, void *obj, int rc);int pub_init(struct mosquitto ** mosq, broker_t broker, void *buf);#endif
c代码
/**********************************************************************************      Copyright:  (C) 2020 longyongtu<longyongtu13@qq.com>*                  All rights reserved.**       Filename:  temp_rpt_aly.c*    Description:  This file**        Version:  1.0.0(16/07/20)*         Author:  longyongtu <longyongtu13@qq.com>*      ChangeLog:  1, Release initial version on \"16/07/20 14:32:00\"*********************************************************************************/#include \"mosquitto_pub.h\"char *get_temp_hmti_cjson(broker_t *broker){float           temp = 31.3;float           hmti = 8;cJSON           *root = cJSON_CreateObject();cJSON           *params = cJSON_CreateObject();char            *data = NULL;int             rv = 0;#if 0rv=get_temp_hmti(I2C_ADDR, SHT_ADDR, &temp, &hmti);if(rv < 0){return NULL;}#endifcJSON_AddItemToObject(root, \"method\", cJSON_CreateString(broker->method));cJSON_AddItemToObject(root, \"id\", cJSON_CreateString(broker->id));//cJSON_AddItemToObject(params, \"CurrentHumidity\", cJSON_CreateNumber((int)hmti));cJSON_AddItemToObject(params, \"CurrentHumidity\", cJSON_CreateNumber(34));//cJSON_AddItemToObject(params, \"CurrentTemperature\", cJSON_CreateNumber(temp));cJSON_AddItemToObject(params, \"CurrentTemperature\", cJSON_CreateNumber(66));cJSON_AddItemToObject(root, \"params\", params);cJSON_AddItemToObject(root, \"version\", cJSON_CreateString(broker->version));data=cJSON_Print(root);printf(\"%s\\n\", data);return data;}static void  print_usage( char *progname){printf(\"Usage  %s [option]...\\n\", progname);printf(\"-p (--port):   the port of the server you want to connect\\n\");printf(\"-h (--host):   the hostname of the server you want to connect\\n\");printf(\"-u (--user):   the username of the client\\n\");printf(\"-P (--passwd): the passwd of the client \\n\");printf(\"-i (--clientid): the clientid of the user\\n\");printf(\"-t (--topic):  the topic of the client you want to pub\\n\");printf(\"-H (--help): Display this help information\\n\");printf(\"-v (--version): Display the program version\\n\");printf(\"%s  Version %s\\n\", progname, PR_VERSION);return ;}void callback(struct mosquitto *mosq, void *obj, int rc){char    *buf;int     mid;int     retain = 1;broker_t *obj1 = obj;printf(\"hhhhhhh\\n\");buf=get_temp_hmti_cjson(obj1);if(!rc){if( mosquitto_publish(mosq, &mid, obj1->pubTopic,strlen(buf)+1, buf, obj1->pubQos, retain) !=MOSQ_ERR_SUCCESS){printf(\"Mosq_Publish() error: %s\\n\", strerror(errno));return ;}printf(\"pubilush topic:%s\\n\", obj1->pubTopic) ;}mosquitto_disconnect(mosq);free(buf);}int main(int argc, char *argv[]){broker_t            broker;char                *progname = NULL;int                 rv = -1;struct mosquitto    *mosq = NULL;strncpy(broker.host, HOST, sizeof(broker.host));broker.port = PORT;strncpy(broker.username, USERNAME, sizeof(broker.username));strncpy(broker.passwd, PASSWD, sizeof(broker.passwd));strncpy(broker.clientid, CLIENTID, sizeof(broker.clientid));strncpy(broker.pubTopic, PUBTOPIC, sizeof(broker.pubTopic));broker.pubQos = PUBQOS;broker.keepalive = KEEPALIVE;strncpy(broker.method, METHOD, sizeof(broker.method));strncpy(broker.id, ID, sizeof(broker.id));strncpy(broker.version, VERSION, sizeof(broker.version));rv=pub_init(&mosq, broker, (void *)&broker);if( rv < 0 ){return -1;}mosquitto_connect_callback_set(mosq, callback);while(1){/* 连接MQTT代理*/if(mosquitto_connect(mosq, broker.host, broker.port, broker.keepalive) != MOSQ_ERR_SUCCESS ){printf(\"mosquitto connect server failure:%s\\n\",strerror(errno));continue;sleep(5);}/* 无线阻塞循环调用loop*/mosquitto_loop_forever(mosq, 6000, 1 );sleep(60);}cleanup:printf(\"program will exit\\n\");mosquitto_destroy(mosq);mosquitto_lib_cleanup();return 0;}int pub_init(struct mosquitto ** mosq, broker_t broker, void *buf){int  rv=-1;/*  Must be called before any other mosquitto functions. not thread safe */mosquitto_lib_init();/*  Create a new mosquitto client instance */*mosq=mosquitto_new(broker.clientid, true, buf);if( !*mosq ){printf(\"creat a mosquitto instance errno\\n\");mosquitto_lib_cleanup();return -1;}#if 1rv=mosquitto_username_pw_set(*mosq, broker.username, broker.passwd);if( rv!=MOSQ_ERR_SUCCESS ){printf(\"set username ande password failure\\n\");mosquitto_destroy(*mosq);//Use to free memory associated with a mosquitto client instance.mosquitto_lib_cleanup();//Call to free resources associated with the library.return -1;}#endifreturn 0;}

运行结果

发布端

订阅端

服务器

流出多一条是因为我前面发送过一次数据的,而且设置了保留数据,所以会多以次,有次看来我们的代码是正确的。

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » 使用mosquitto函数实现与EMQX的发布和订阅