前面几个博客我们已经将这些基础都已经准备好了,接下来只需要简单的编程就可以实现了
首先我们创建一个结构体
typedef struct broker_sub_s{char host[64];int port;char username[32];char passwd[64];char clientid[64];char subTopic[64];int subQos;int keepalive;}broker_sub_t;
这里面我们储存了关于broker的一些信息和连接是需要的数据,存储了地址、端口、用户名、密码、clientid、主题、服务等级、心跳连接的时间。将它们打包成一个结构体,能够让我们再设置回调函数时就有了很大的方便,
订阅
订阅到的信息将保存到下面的结构体
struct mosquitto_message{int mid;//消息序号IDchar *topic; //主题void *payload; //主题内容 ,MQTT 中有效载荷int payloadlen; //消息的长度,单位是字节int qos; //服务质量bool retain; //是否保留消息};
进行订阅的话我们首先要进行mosquitto初始化,然后传讲mosquitto实例,然后调用函数设置用户名和密码,之后我们可以设置对应的回调函数,我这里设置的是connect和message的回调,具体内容可看代码,因为我们传递的信息是json数据格式,所以我还写了个解析cjson的函数,具体可以看看我之前的博客,再connect回调中我们可以用mosquitto_subcrible发布消息(发布端类似),然后在另外一个中打印出信息,最会调用无限循环的函数。具体代码如下:
订阅头文件
/********************************************************************************* Copyright: (C) 2020 longyongtu<longyongtu13@qq.com>* All rights reserved.** Filename: temp_rpt_aly.h* Description: This head file** Version: 1.0.0(17/07/20)* Author: longyongtu <longyongtu13@qq.com>* ChangeLog: 1, Release initial version on \"17/07/20 16:53:49\"*********************************************************************************/#ifndef _MOSQUITTO_SUB_H#define _MOSQUITTO_SUB_H#include <stdio.h>#include <errno.h>#include <unistd.h>#include <libgen.h>#include <getopt.h>#include <string.h>#include <mosquitto.h>#include <stdlib.h>#include \"cJSON.h\"#define PR_VERSION \"1.0.0\"#define HOST \"192.168.240.134\"#define PORT 1883#define USERNAME \"test_username\"#define PASSWD \"test_password\"#define CLIENTID \"12345\"#define SUBTOPIC \"test\"#define SUBQOS 1#define KEEPALIVE 30typedef struct broker_sub_s{char host[64];int port;char username[32];char passwd[64];char clientid[64];char subTopic[64];int subQos;int keepalive;}broker_sub_t;static void print_usage( char *progname);void connect_callback(struct mosquitto *mosq, void *obj, int rc);int sub_init(struct mosquitto ** mosq, broker_sub_t broker, void *buf);void printJson(cJSON * root);void message_callback(struct mosquitto *mosq, void* obj, const struct mosquitto_message *message);#endif
订阅C代码
/********************************************************************************** Copyright: (C) 2020 longyongtu<longyongtu13@qq.com>* All rights reserved.** Filename: temp_rpt_aly.c* Description: This file** Version: 1.0.0(16/07/20)* Author: longyongtu <longyongtu13@qq.com>* ChangeLog: 1, Release initial version on \"16/07/20 14:32:00\"*********************************************************************************/#include \"mosquitto_sub.h\"void printJson(cJSON * root)//以递归的方式打印json的最内层键值对{for(int i=0; i<cJSON_GetArraySize(root); i++) //遍历最外层json键值对{cJSON * item = cJSON_GetArrayItem(root, i);if(cJSON_Object == item->type) //如果对应键的值仍为cJSON_Object就递归调用printJsonprintJson(item);else //值不为json对象就直接打印出键和值{printf(\"%s:\", item->string);printf(\"%s\\r\\n\", cJSON_Print(item));}}}void connect_callback(struct mosquitto *mosq, void *obj, int rc){broker_sub_t *broker = (broker_sub_t*)obj;if(!rc){if( mosquitto_subscribe(mosq, NULL, broker->subTopic, 0) != MOSQ_ERR_SUCCESS ){printf(\"Mosq_subcrible() error: %s\\n\", strerror(errno));return ;}printf(\"subicrible topic:%s\\n\", broker->subTopic);return;}return;}#if 0struct mosquitto_message{int mid;//消息序号IDchar *topic; //主题void *payload; //主题内容 ,MQTT 中有效载荷int payloadlen; //消息的长度,单位是字节int qos; //服务质量bool retain; //是否保留消息};#endifvoid message_callback(struct mosquitto *mosq, void* obj, const struct mosquitto_message *message){cJSON *buf = NULL;printf(\"subcrible topic is %s\\n\", message->topic);printf(\"message: \\n\");buf=cJSON_Parse(message->payload);printJson(buf);cJSON_Delete(buf);return;}int main(int argc, char *argv[]){broker_sub_t broker;int rv = -1;struct mosquitto *mosq = NULL;strncpy(broker.host, HOST, sizeof(broker.host));broker.port = PORT;strncpy(broker.username, USERNAME, sizeof(broker.username));strncpy(broker.passwd, PASSWD, sizeof(broker.passwd));strncpy(broker.clientid, CLIENTID, sizeof(broker.clientid));strncpy(broker.subTopic, SUBTOPIC, sizeof(broker.subTopic));broker.subQos = SUBQOS;broker.keepalive = KEEPALIVE;sub_init(&mosq, broker, (void *)&broker);if( !rv ){return -1;}/* Set the subscribe callback. This is called when the broker responds to a subscription request */mosquitto_message_callback_set(mosq, message_callback);/* Set the connect callback. This is called when the broker sends a CONNACK message in response to aconnection */mosquitto_connect_callback_set(mosq, connect_callback);rv=mosquitto_connect(mosq, broker.host, broker.port, broker.keepalive);if( rv != MOSQ_ERR_SUCCESS){printf(\"client connect broker failure\\n\");mosquitto_destroy(mosq);//Use to free memory associated with a mosquitto client instance.mosquitto_lib_cleanup();//Call to free resources associated with the library.return -1;}rv=mosquitto_loop_forever(mosq, 11000, 1);if( rv != MOSQ_ERR_SUCCESS ){printf(\"%s\\n\", strerror(errno));}return 0;}int sub_init(struct mosquitto ** mosq, broker_sub_t broker, void *buf){int rv=-1;//struct mosquitto *mosq1 = NULL;/* Must be called before any other mosquitto functions. not thread safe */mosquitto_lib_init();/* Create a new mosquitto client instance */*mosq=mosquitto_new(broker.clientid, true, buf);if( !*mosq ){printf(\"creat a mosquitto instance errno\\n\");mosquitto_lib_cleanup();return -1;}#if 1rv=mosquitto_username_pw_set(*mosq, broker.username, broker.passwd);if( rv!=MOSQ_ERR_SUCCESS ){printf(\"set username ande password failure\\n\");mosquitto_destroy(*mosq);//Use to free memory associated with a mosquitto client instance.mosquitto_lib_cleanup();//Call to free resources associated with the library.return -1;}#endifreturn 0;}
发布
发布的代码编写和订阅端差不多,
发布头文件
/********************************************************************************* Copyright: (C) 2020 longyongtu<longyongtu13@qq.com>* All rights reserved.** Filename: temp_rpt_aly.h* Description: This head file** Version: 1.0.0(17/07/20)* Author: longyongtu <longyongtu13@qq.com>* ChangeLog: 1, Release initial version on \"17/07/20 16:53:49\"*********************************************************************************/#ifndef _MOSQUITTO_PUB_H#define _MOSQUITTO_PUB_H#include <stdio.h>#include <errno.h>#include <unistd.h>#include <libgen.h>#include <getopt.h>#include <string.h>#include <mosquitto.h>#include <stdlib.h>#include \"sht30.h\"#include \"cJSON.h\"#define PR_VERSION \"1.0.0\"#define HOST \"192.168.240.134\"#define PORT 1883#define USERNAME \"test_username\"#define PASSWD \"test_password\"#define CLIENTID \"12345\"#define PUBTOPIC \"test\"#define PUBQOS 1#define KEEPALIVE 60#define METHOD \"thing.event.property.post\"#define ID \"232424235324\"#define VERSION \"1.0.0\"typedef struct broker_s{char host[64];int port;char username[32];char passwd[64];char clientid[64];char pubTopic[64];int pubQos;int keepalive;char method[32];char id[16];char version[16];}broker_t;char *get_temp_hmti_cjson(broker_t *broker);static void print_usage( char *progname);void callback(struct mosquitto *mosq, void *obj, int rc);int pub_init(struct mosquitto ** mosq, broker_t broker, void *buf);#endif
c代码
/********************************************************************************** Copyright: (C) 2020 longyongtu<longyongtu13@qq.com>* All rights reserved.** Filename: temp_rpt_aly.c* Description: This file** Version: 1.0.0(16/07/20)* Author: longyongtu <longyongtu13@qq.com>* ChangeLog: 1, Release initial version on \"16/07/20 14:32:00\"*********************************************************************************/#include \"mosquitto_pub.h\"char *get_temp_hmti_cjson(broker_t *broker){float temp = 31.3;float hmti = 8;cJSON *root = cJSON_CreateObject();cJSON *params = cJSON_CreateObject();char *data = NULL;int rv = 0;#if 0rv=get_temp_hmti(I2C_ADDR, SHT_ADDR, &temp, &hmti);if(rv < 0){return NULL;}#endifcJSON_AddItemToObject(root, \"method\", cJSON_CreateString(broker->method));cJSON_AddItemToObject(root, \"id\", cJSON_CreateString(broker->id));//cJSON_AddItemToObject(params, \"CurrentHumidity\", cJSON_CreateNumber((int)hmti));cJSON_AddItemToObject(params, \"CurrentHumidity\", cJSON_CreateNumber(34));//cJSON_AddItemToObject(params, \"CurrentTemperature\", cJSON_CreateNumber(temp));cJSON_AddItemToObject(params, \"CurrentTemperature\", cJSON_CreateNumber(66));cJSON_AddItemToObject(root, \"params\", params);cJSON_AddItemToObject(root, \"version\", cJSON_CreateString(broker->version));data=cJSON_Print(root);printf(\"%s\\n\", data);return data;}static void print_usage( char *progname){printf(\"Usage %s [option]...\\n\", progname);printf(\"-p (--port): the port of the server you want to connect\\n\");printf(\"-h (--host): the hostname of the server you want to connect\\n\");printf(\"-u (--user): the username of the client\\n\");printf(\"-P (--passwd): the passwd of the client \\n\");printf(\"-i (--clientid): the clientid of the user\\n\");printf(\"-t (--topic): the topic of the client you want to pub\\n\");printf(\"-H (--help): Display this help information\\n\");printf(\"-v (--version): Display the program version\\n\");printf(\"%s Version %s\\n\", progname, PR_VERSION);return ;}void callback(struct mosquitto *mosq, void *obj, int rc){char *buf;int mid;int retain = 1;broker_t *obj1 = obj;printf(\"hhhhhhh\\n\");buf=get_temp_hmti_cjson(obj1);if(!rc){if( mosquitto_publish(mosq, &mid, obj1->pubTopic,strlen(buf)+1, buf, obj1->pubQos, retain) !=MOSQ_ERR_SUCCESS){printf(\"Mosq_Publish() error: %s\\n\", strerror(errno));return ;}printf(\"pubilush topic:%s\\n\", obj1->pubTopic) ;}mosquitto_disconnect(mosq);free(buf);}int main(int argc, char *argv[]){broker_t broker;char *progname = NULL;int rv = -1;struct mosquitto *mosq = NULL;strncpy(broker.host, HOST, sizeof(broker.host));broker.port = PORT;strncpy(broker.username, USERNAME, sizeof(broker.username));strncpy(broker.passwd, PASSWD, sizeof(broker.passwd));strncpy(broker.clientid, CLIENTID, sizeof(broker.clientid));strncpy(broker.pubTopic, PUBTOPIC, sizeof(broker.pubTopic));broker.pubQos = PUBQOS;broker.keepalive = KEEPALIVE;strncpy(broker.method, METHOD, sizeof(broker.method));strncpy(broker.id, ID, sizeof(broker.id));strncpy(broker.version, VERSION, sizeof(broker.version));rv=pub_init(&mosq, broker, (void *)&broker);if( rv < 0 ){return -1;}mosquitto_connect_callback_set(mosq, callback);while(1){/* 连接MQTT代理*/if(mosquitto_connect(mosq, broker.host, broker.port, broker.keepalive) != MOSQ_ERR_SUCCESS ){printf(\"mosquitto connect server failure:%s\\n\",strerror(errno));continue;sleep(5);}/* 无线阻塞循环调用loop*/mosquitto_loop_forever(mosq, 6000, 1 );sleep(60);}cleanup:printf(\"program will exit\\n\");mosquitto_destroy(mosq);mosquitto_lib_cleanup();return 0;}int pub_init(struct mosquitto ** mosq, broker_t broker, void *buf){int rv=-1;/* Must be called before any other mosquitto functions. not thread safe */mosquitto_lib_init();/* Create a new mosquitto client instance */*mosq=mosquitto_new(broker.clientid, true, buf);if( !*mosq ){printf(\"creat a mosquitto instance errno\\n\");mosquitto_lib_cleanup();return -1;}#if 1rv=mosquitto_username_pw_set(*mosq, broker.username, broker.passwd);if( rv!=MOSQ_ERR_SUCCESS ){printf(\"set username ande password failure\\n\");mosquitto_destroy(*mosq);//Use to free memory associated with a mosquitto client instance.mosquitto_lib_cleanup();//Call to free resources associated with the library.return -1;}#endifreturn 0;}
运行结果
发布端
订阅端
服务器
流出多一条是因为我前面发送过一次数据的,而且设置了保留数据,所以会多以次,有次看来我们的代码是正确的。