2018年5月6日 星期日

MQTT-Message Queuing Telemetry Transport

一、何謂MQTT(Message Queuing Telemetry Transport)
ref: 第三章 MQTT通訊協議
MQTT通訊為一對多的M2M傳輸,使用發佈(Publish)/訂閱(Subscribe)的訊息傳送機制,此機制中包含4個主要的元素,發佈者(Publisher)、訂閱者(Subscriber)、主題(Topic)、訊息中轉站(Broker)。
Publisher為訊息的來源,傳送夾帶有Topic資訊的訊息至Broker,訂閱者向Broker註冊想要接受到之訊息的Topic,例如有一Publisher發佈一則Topic為”Test”的訊息,只要是有對Broker註冊Topic為”Test”的Subscriber都能接收到此訊息。
除了發佈/訂閱的機制外,MQTT通訊協議有幾項特點:
  1. 使用TCP/IP作為基本的網路連線
  2.  提供三種訊息傳送服務的QoS
    a. 
    QoS0At most once(最多一次),訊息可能被重複發送或遺失,適合使用於感測器的原始資料傳送,因為下一則訊息將馬上被送出
    b. 
    QoS1At least once(至少一次),保證訊息會被送達,但可能會發生重複發送的情形
    c. 
    QoS2” Exactly once(確保一次),保證訊息只會被送達一次,適用於對高度謹慎之系統
  3. ...more
二、Basic (ref: Sending and Receiving Messages with MQTT)
  1. A message has a topic and a payload, like the subject and the content of an e-mail.
  2. The Publisher sends a message to the network.
  3. The Subscriber listens for messages with a particular topic.
  4. The Broker is responsible for coordinating the communication between publishers and subscribers. It can also store messages while subscribers are offline (a feature not used in this tutorial).
  5. ...more
三、Programming with C and Python
(ref: iosphere/mosquitto 依自已需求改了一點test5.c裡的test7)
安裝: libssl-dev
快速編譯: git clone 上面連結,替換裡面的test5.c後再make就行了
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#include "Thread.h"

#if defined(_WINDOWS)
#include <windows.h>
#include <openssl/applink.c>
#define MAXHOSTNAMELEN 256
#define snprintf _snprintf
#else
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#endif

#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))

/*********************************************************************

 Test7: Send and receive big messages

 *********************************************************************/

void* test7_payload = NULL;
int test7_payloadlen = 99;

typedef struct
{
 MQTTAsync client;
 char clientid[24];
 char topic[100];
 int maxmsgs;
 int rcvdmsgs[3];
 int sentmsgs[3];
 int testFinished;
 int subscribed;
} AsyncTestClient;

#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
void MyLog(int LOGA_level, char* format, ...)
{
 static char msg_buf[256];
 va_list args;
 struct timeb ts;

 struct tm *timeinfo;

 if (LOGA_level == LOGA_DEBUG)
  return;

 ftime(&ts);
 timeinfo = localtime(&ts.time);
 strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);

 sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);

 va_start(args, format);
 vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
   format, args);
 va_end(args);

 printf("%s\n", msg_buf);
 fflush(stdout);
}

#define AsyncTestClient_initializer {NULL, "\0", "\0", 0, {0, 0, 0}, {0, 0, 0}, 0, 0}

#if defined(WIN32) || defined(_WINDOWS)
#define mqsleep(A) Sleep(1000*A)
#define START_TIME_TYPE DWORD
static DWORD start_time = 0;
START_TIME_TYPE start_clock(void)
{
 return GetTickCount();
}
#elif defined(AIX)
#define mqsleep sleep
#define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void)
{
 static struct timespec start;
 clock_gettime(CLOCK_REALTIME, &start);
 return start;
}
#else
#define mqsleep sleep
#define START_TIME_TYPE struct timeval
/* TODO - unused - remove? static struct timeval start_time; */
START_TIME_TYPE start_clock(void)
{
 struct timeval start_time;
 gettimeofday(&start_time, NULL);
 return start_time;
}
#endif

#if defined(WIN32)
long elapsed(START_TIME_TYPE start_time)
{
 return GetTickCount() - start_time;
}
#elif defined(AIX)
#define assert(a)
long elapsed(struct timespec start)
{
 struct timespec now, res;

 clock_gettime(CLOCK_REALTIME, &now);
 ntimersub(now, start, res);
 return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long elapsed(START_TIME_TYPE start_time)
{
 struct timeval now, res;

 gettimeofday(&now, NULL);
 timersub(&now, &start_time, &res);
 return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
}
#endif

#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
#define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)

#define MAXMSGS 30;

int tests = 0;
int failures = 0;
FILE* xml;
START_TIME_TYPE global_start_time;
char output[3000];
char* cur_output = output;


void write_test_result(void)
{
 long duration = elapsed(global_start_time);

 fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
 if (cur_output != output)
 {
  fprintf(xml, "%s", output);
  cur_output = output;
 }
 fprintf(xml, "</testcase>\n");
}

void myassert(char* filename, int lineno, char* description, int value,
  char* format, ...)
{
 ++tests;
 if (!value)
 {
  va_list args;

  ++failures;
  printf("Assertion failed, file %s, line %d, description: %s", filename,
    lineno, description);

  va_start(args, format);
  vprintf(format, args);
  va_end(args);

  cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
                        description, filename, lineno);
 }
 else
  MyLog(LOGA_DEBUG,
    "Assertion succeeded, file %s, line %d, description: %s",
    filename, lineno, description);
}

void test7OnConnectFailure(void* context, MQTTAsync_failureData* response)
{
 AsyncTestClient* client = (AsyncTestClient*) context;
 MyLog(LOGA_DEBUG, "In test7OnConnectFailure callback, %s", client->clientid);

 assert("There should be no failures in this test. ", 0, "test7OnConnectFailure callback was called\n", 0);
 client->testFinished = 1;
}

void test7OnPublishFailure(void* context, MQTTAsync_failureData* response)
{
 AsyncTestClient* client = (AsyncTestClient*) context;
 MyLog(LOGA_DEBUG, "In test7OnPublishFailure callback, %s", client->clientid);

 assert("There should be no failures in this test. ", 0, "test7OnPublishFailure callback was called\n", 0);
 client->testFinished = 1;
}

int test7MessageArrived(void* context, char* topicName, int topicLen,
  MQTTAsync_message* message)
{
    printf("in test7MessageArrived\n");
 AsyncTestClient* tc = (AsyncTestClient*) context;
 static int message_count = 0;
 int rc, i;

 MyLog(LOGA_DEBUG, "In messageArrived callback %p", tc);

 MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

 pubmsg.payload = "{\"Action\": \"RegisterReq\", \"Time\": \"2018-04-13T12:50:00Z\", \"RequestID\": \"xxxxxxx\", \"Version\": \"1.1\"}";
 pubmsg.payloadlen = 100;
 pubmsg.qos = 0;
 pubmsg.retained = 0;
 opts.onSuccess = NULL;
 opts.onFailure = test7OnPublishFailure;
 opts.context = tc;
    printf("%s, tc->topic=%s, tc->clientid=%s\n", __func__, tc->topic, tc->clientid);
    usleep(50000); //50ms
 rc = MQTTAsync_sendMessage(tc->client, "ai/speaker/NerERA/04370927/req", &pubmsg, &opts);
    printf("recv the body is %s\n", (char *) message->payload);
    if (strlen((char *) message->payload)>1)
        tc->testFinished = 1;
 MQTTAsync_freeMessage(&message);
 MQTTAsync_free(topicName);
 return 1;
}

void test7OnSubscribe(void* context, MQTTAsync_successData* response)
{
    printf("in test7OnSubscribe\n");
 AsyncTestClient* tc = (AsyncTestClient*) context;
 MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
 int rc, i;

 MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p", tc);

 pubmsg.payload = "0";
 pubmsg.payloadlen = 100;

 pubmsg.qos = 0;
 pubmsg.retained = 0;

    printf("in test7OnSubscribe topic=%s\n", tc->topic);
    usleep(50000);
 rc = MQTTAsync_send(tc->client, tc->topic, pubmsg.payloadlen, pubmsg.payload,
   pubmsg.qos, pubmsg.retained, NULL);
}

void test7OnConnect(void* context, MQTTAsync_successData* response)
{
 AsyncTestClient* tc = (AsyncTestClient*) context;
 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
 int rc;

 MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
 opts.onSuccess = test7OnSubscribe;
 opts.context = tc;
    printf("subscribe, tc->topic=%s\n", tc->topic);
    usleep(50000);
 rc = MQTTAsync_subscribe(tc->client, tc->topic, 0, &opts);
    printf("subscribe_rc=%d\n", rc);
 assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
 if (rc != MQTTASYNC_SUCCESS)
  tc->testFinished = 1;
}

int test7(void)
{
 char *testname = "xxxxxxxx";
    char *broker="ssl://happy.ai.test.net:8888";
 char *server_key_file="../../../test/ssl/ROOTeCA_64.crt";
 int subsqos = 0;
 AsyncTestClient tc =
 AsyncTestClient_initializer;
 MQTTAsync c;
 MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
 MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
 MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
 int rc = 0;
 int test_finished;

 test_finished = failures = 0;

 MyLog(LOGA_INFO, "Starting test 7 - big messages");
 fprintf(xml, "testcase classname=\"test5\" name=\"%s\"", testname);
 global_start_time = start_clock();
 rc = MQTTAsync_create(&c, broker, "async_test_7", MQTTCLIENT_PERSISTENCE_NONE,
   NULL);
 assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
 if (rc != MQTTASYNC_SUCCESS)
 {
  MQTTAsync_destroy(&c);
  goto exit;
 }
 rc = MQTTAsync_setCallbacks(c, &tc, NULL, test7MessageArrived, NULL);
 assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);

    tc.client = c;
 sprintf(tc.clientid, "%s", testname);
    sprintf(tc.topic, "ai/speaker/NerERA/04370927/rsp");//topic_s
 tc.maxmsgs = MAXMSGS;
 //tc.rcvdmsgs = 0;
 tc.subscribed = 0;
 tc.testFinished = 0;

 opts.keepAliveInterval = 20;
 opts.cleansession = 1;
 opts.username = "de0236c6ece1dd2dfce194c7c4b8d54";
 opts.password = "de0236c6ece1dd2dfce194c7c4b8d54";

 opts.will = &wopts;
 opts.will->message = "{\"Action\": \"RegisterReq\", \"Time\": \"2018-03-14T11:50:00Z\", \"RequestID\": \"xxxxxxxx\", \"Version\": \"1.1\"}";
 //opts.will->qos = 1;
 opts.will->qos = 0;
 opts.will->retained = 0;
 opts.will->topicName = "ai/speaker/NerERA/04370927/req"; //topic_p
 opts.will = NULL;

    opts.onSuccess = test7OnConnect;
 opts.onFailure = test7OnConnectFailure;
 opts.context = &tc;
    sslopts.struct_version = 1;//Must be 0, or 1 to enable TLS version selection
    sslopts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
    sslopts.enableServerCertAuth=0;
 opts.ssl = &sslopts;
 if (server_key_file != NULL)
  opts.ssl->trustStore = "../../../test/ssl/ROOTeCA_64.crt"; /*file of certificates trusted by client*/

 MyLog(LOGA_DEBUG, "Connecting");
 rc = MQTTAsync_connect(c, &opts);
 assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
 if (rc != MQTTASYNC_SUCCESS)
  goto exit;

 while (!tc.testFinished)
#if defined(WIN32)
  Sleep(100);
#else
  usleep(1000L);
#endif

 MQTTAsync_destroy(&c);

 exit: MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
   (failures == 0) ? "passed" : "failed", testname, tests, failures);
 write_test_result();
 return failures;
}

void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
 printf("%s\n", message);
}

int main(int argc, char** argv)
{
 int rc = 0;

 xml = fopen("TEST-test5.xml", "w");
 fprintf(xml, "<testsuite name=\"test5\" tests=7>\n");

    test7();

 if (rc == 0)
  MyLog(LOGA_INFO, "verdict pass");
 else
  MyLog(LOGA_INFO, "verdict fail");

 fprintf(xml, "</testsuite>\n");
 fclose(xml);

 return rc;
}
ref:
Sending and Receiving Messages with MQTT
MQTTTransport_mbedTLS.cpp
iosphere/mosquitto
MQTT C 接入示例
potato papa

沒有留言:

張貼留言