diff options
author | V.Krishn <vkrishn4@gmail.com> | 2018-09-05 11:08:40 +0530 |
---|---|---|
committer | V.Krishn <vkrishn4@gmail.com> | 2018-09-05 11:08:40 +0530 |
commit | 0fa50f954a2b51c0dcbf1cca1600cd65e4151b6b (patch) | |
tree | 4334d70c67c2a57d2332de14259ec1b7618edd11 | |
parent | 4d0bc60f3aab7922dba8e4ff6c9414803c5794d3 (diff) | |
download | mqtt-dirpub-0fa50f954a2b51c0dcbf1cca1600cd65e4151b6b.tar.bz2 |
update to mosquitto-v1.5.0
-rw-r--r-- | client_shared.c | 321 | ||||
-rw-r--r-- | client_shared.h | 11 | ||||
-rw-r--r-- | sub_client.c | 118 |
3 files changed, 388 insertions, 62 deletions
diff --git a/client_shared.c b/client_shared.c index 48f25e2..b3ab691 100644 --- a/client_shared.c +++ b/client_shared.c @@ -16,6 +16,7 @@ Contributors: V Krishn - implement dirpub. */ +#define _POSIX_C_SOURCE 200809L #include <errno.h> #include <fcntl.h> @@ -24,10 +25,12 @@ Contributors: #include <string.h> #ifndef WIN32 #include <unistd.h> +#include <strings.h> #else #include <process.h> #include <winsock2.h> #define snprintf sprintf_s +#define strncasecmp _strnicmp #endif #include <mosquitto.h> @@ -36,10 +39,91 @@ Contributors: static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url); static int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]); + +static int check_format(struct mosq_config *cfg, const char *str) +{ + int i; + int len; + + len = strlen(str); + for(i=0; i<len; i++){ + if(str[i] == '%'){ + if(i == len-1){ + // error + fprintf(stderr, "Error: Incomplete format specifier.\n"); + return 1; + }else{ + if(str[i+1] == '%'){ + // Print %, ignore + }else if(str[i+1] == 'I'){ + // ISO 8601 date+time + }else if(str[i+1] == 'l'){ + // payload length + }else if(str[i+1] == 'm'){ + // mid + }else if(str[i+1] == 'p'){ + // payload + }else if(str[i+1] == 'q'){ + // qos + }else if(str[i+1] == 'r'){ + // retain + }else if(str[i+1] == 't'){ + // topic + }else if(str[i+1] == 'j'){ + // JSON output, escaped payload + }else if(str[i+1] == 'J'){ + // JSON output, assuming JSON payload + }else if(str[i+1] == 'U'){ + // Unix time+nanoseconds + }else if(str[i+1] == 'x' || str[i+1] == 'X'){ + // payload in hex + }else{ + fprintf(stderr, "Error: Invalid format specifier '%c'.\n", str[i+1]); + return 1; + } + i++; + } + }else if(str[i] == '@'){ + if(i == len-1){ + // error + fprintf(stderr, "Error: Incomplete format specifier.\n"); + return 1; + } + i++; + }else if(str[i] == '\\'){ + if(i == len-1){ + // error + fprintf(stderr, "Error: Incomplete escape specifier.\n"); + return 1; + }else{ + switch(str[i+1]){ + case '\\': // '\' + case '0': // 0 (NULL) + case 'a': // alert + case 'e': // escape + case 'n': // new line + case 'r': // carriage return + case 't': // horizontal tab + case 'v': // vertical tab + break; + + default: + fprintf(stderr, "Error: Invalid escape specifier '%c'.\n", str[i+1]); + return 1; + } + i++; + } + } + } + + return 0; +} + + void init_config(struct mosq_config *cfg) { memset(cfg, 0, sizeof(*cfg)); - cfg->port = 1883; + cfg->port = -1; cfg->max_inflight = 20; cfg->keepalive = 60; cfg->clean_session = true; @@ -65,6 +149,7 @@ void client_config_cleanup(struct mosq_config *cfg) free(cfg->password); free(cfg->will_topic); free(cfg->will_payload); + free(cfg->format); #ifdef WITH_TLS free(cfg->cafile); free(cfg->capath); @@ -89,6 +174,12 @@ void client_config_cleanup(struct mosq_config *cfg) } free(cfg->filter_outs); } + if(cfg->unsub_topics){ + for(i=0; i<cfg->unsub_topic_count; i++){ + free(cfg->unsub_topics[i]); + } + free(cfg->unsub_topics); + } #ifdef WITH_SOCKS free(cfg->socks5_host); free(cfg->socks5_username); @@ -240,11 +331,12 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char * } #endif + if(cfg->clean_session == false && (cfg->id_prefix || !cfg->id)){ + if(!cfg->quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n"); + return 1; + } + if(pub_or_sub == CLIENT_SUB){ - if(cfg->clean_session == false && (cfg->id_prefix || !cfg->id)){ - if(!cfg->quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n"); - return 1; - } if(cfg->topic_count == 0){ if(!cfg->quiet) fprintf(stderr, "Error: You must specify a topic to subscribe to.\n"); return 1; @@ -257,6 +349,34 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char * return MOSQ_ERR_SUCCESS; } +int cfg_add_topic(struct mosq_config *cfg, int pub_or_sub, char *topic, const char *arg) +{ + if(mosquitto_validate_utf8(topic, strlen(topic))){ + fprintf(stderr, "Error: Malformed UTF-8 in %s argument.\n\n", arg); + return 1; + } + if(pub_or_sub == CLIENT_PUB){ + if(mosquitto_pub_topic_check(topic) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", topic); + return 1; + } + cfg->topic = strdup(topic); + } else { + if(mosquitto_sub_topic_check(topic) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", topic); + return 1; + } + cfg->topic_count++; + cfg->topics = realloc(cfg->topics, cfg->topic_count*sizeof(char *)); + if(!cfg->topics){ + fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } + cfg->topics[cfg->topic_count-1] = strdup(topic); + } + return 0; +} + /* Process a tokenised single line from a file or set of real argc/argv */ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]) { @@ -354,6 +474,22 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c } i++; } + }else if(!strcmp(argv[i], "-W")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + }else{ + if(i==argc-1){ + fprintf(stderr, "Error: -W argument given but no timeout specified.\n\n"); + return 1; + }else{ + cfg->timeout = atoi(argv[i+1]); + if(cfg->timeout < 1){ + fprintf(stderr, "Error: Invalid timeout \"%d\".\n\n", cfg->msg_count); + return 1; + } + } + i++; + } }else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){ cfg->debug = true; }else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){ @@ -369,6 +505,28 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c }else{ cfg->pub_mode = MSGMODE_FILE; cfg->file_input = strdup(argv[i+1]); + if(!cfg->file_input){ + fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } + } + i++; + }else if(!strcmp(argv[i], "-F")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + if(i==argc-1){ + fprintf(stderr, "Error: -F argument given but no format specified.\n\n"); + return 1; + }else{ + cfg->format = strdup(argv[i+1]); + if(!cfg->format){ + fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } + if(check_format(cfg, cfg->format)){ + return 1; + } } i++; }else if(!strcmp(argv[i], "--help")){ @@ -431,6 +589,51 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c } i++; #endif + }else if(!strcmp(argv[i], "-L") || !strcmp(argv[i], "--url")){ + if(i==argc-1){ + fprintf(stderr, "Error: -L argument given but no URL specified.\n\n"); + return 1; + } else { + char *url = argv[i+1]; + char *topic; + char *tmp; + + if(!strncasecmp(url, "mqtt://", 7)) { + url += 7; + cfg->port = 1883; + } else if(!strncasecmp(url, "mqtts://", 8)) { + url += 8; + cfg->port = 8883; + } else { + fprintf(stderr, "Error: unsupported URL scheme.\n\n"); + return 1; + } + topic = strchr(url, '/'); + *topic++ = 0; + + if(cfg_add_topic(cfg, pub_or_sub, topic, "-L topic")) + return 1; + + tmp = strchr(url, '@'); + if(tmp) { + char *colon = strchr(url, ':'); + *tmp++ = 0; + if(colon) { + *colon = 0; + cfg->password = colon + 1; + } + cfg->username = url; + url = tmp; + } + cfg->host = url; + + tmp = strchr(url, ':'); + if(tmp) { + *tmp++ = 0; + cfg->port = atoi(tmp); + } + } + i++; }else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--stdin-line")){ if(pub_or_sub == CLIENT_SUB){ goto unknown_option; @@ -539,6 +742,11 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c goto unknown_option; } cfg->retain = 1; + }else if(!strcmp(argv[i], "--retained-only")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->retained_only = true; }else if(!strcmp(argv[i], "-s") || !strcmp(argv[i], "--stdin-file")){ if(pub_or_sub == CLIENT_SUB){ goto unknown_option; @@ -558,21 +766,8 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c fprintf(stderr, "Error: -t argument given but no topic specified.\n\n"); return 1; }else{ - if(pub_or_sub == CLIENT_PUB){ - if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ - fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", argv[i+1]); - return 1; - } - cfg->topic = strdup(argv[i+1]); - }else{ - if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ - fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]); - return 1; - } - cfg->topic_count++; - cfg->topics = realloc(cfg->topics, cfg->topic_count*sizeof(char *)); - cfg->topics[cfg->topic_count-1] = strdup(argv[i+1]); - } + if(cfg_add_topic(cfg, pub_or_sub, argv[i + 1], "-t")) + return 1; i++; } }else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){ @@ -583,15 +778,48 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c fprintf(stderr, "Error: -T argument given but no topic filter specified.\n\n"); return 1; }else{ + if(mosquitto_validate_utf8(argv[i+1], strlen(argv[i+1]))){ + fprintf(stderr, "Error: Malformed UTF-8 in -T argument.\n\n"); + return 1; + } if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ fprintf(stderr, "Error: Invalid filter topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]); return 1; } cfg->filter_out_count++; cfg->filter_outs = realloc(cfg->filter_outs, cfg->filter_out_count*sizeof(char *)); + if(!cfg->filter_outs){ + fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } cfg->filter_outs[cfg->filter_out_count-1] = strdup(argv[i+1]); } i++; + }else if(!strcmp(argv[i], "-U") || !strcmp(argv[i], "--unsubscribe")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + if(i==argc-1){ + fprintf(stderr, "Error: -U argument given but no unsubscribe topic specified.\n\n"); + return 1; + }else{ + if(mosquitto_validate_utf8(argv[i+1], strlen(argv[i+1]))){ + fprintf(stderr, "Error: Malformed UTF-8 in -U argument.\n\n"); + return 1; + } + if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid unsubscribe topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]); + return 1; + } + cfg->unsub_topic_count++; + cfg->unsub_topics = realloc(cfg->unsub_topics, cfg->unsub_topic_count*sizeof(char *)); + if(!cfg->unsub_topics){ + fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } + cfg->unsub_topics[cfg->unsub_topic_count-1] = strdup(argv[i+1]); + } + i++; #ifdef WITH_TLS }else if(!strcmp(argv[i], "--tls-version")){ if(i==argc-1){ @@ -646,6 +874,10 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c fprintf(stderr, "Error: --will-topic argument given but no will topic specified.\n\n"); return 1; }else{ + if(mosquitto_validate_utf8(argv[i+1], strlen(argv[i+1]))){ + fprintf(stderr, "Error: Malformed UTF-8 in --will-topic argument.\n\n"); + return 1; + } if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ fprintf(stderr, "Error: Invalid will topic '%s', does it contain '+' or '#'?\n", argv[i+1]); return 1; @@ -654,9 +886,6 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c } i++; }else if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "--disable-clean-session")){ - if(pub_or_sub == CLIENT_PUB){ - goto unknown_option; - } cfg->clean_session = false; }else if(!strcmp(argv[i], "-N")){ if(pub_or_sub == CLIENT_PUB){ @@ -779,15 +1008,33 @@ int client_connect(struct mosquitto *mosq, struct mosq_config *cfg) { char err[1024]; int rc; + int port; + +#ifdef WITH_TLS + if(cfg->port < 0){ + if(cfg->cafile || cfg->capath +#ifdef WITH_TLS_PSK + || cfg->psk +#endif + ){ + port = 8883; + }else{ + port = 1883; + } + }else +#endif + { + port = cfg->port; + } #ifdef WITH_SRV if(cfg->use_srv){ rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address); }else{ - rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address); + rc = mosquitto_connect_bind(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address); } #else - rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address); + rc = mosquitto_connect_bind(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address); #endif if(rc>0){ if(!cfg->quiet){ @@ -893,6 +1140,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url) } len = i-start; host = malloc(len + 1); + if(!host){ + fprintf(stderr, "Error: Out of memory.\n"); + goto cleanup; + } memcpy(host, &(str[start]), len); host[len] = '\0'; start = i+1; @@ -902,6 +1153,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url) * socks5h://username:password@host[:port] */ len = i-start; username_or_host = malloc(len + 1); + if(!username_or_host){ + fprintf(stderr, "Error: Out of memory.\n"); + goto cleanup; + } memcpy(username_or_host, &(str[start]), len); username_or_host[len] = '\0'; start = i+1; @@ -918,6 +1173,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url) len = i-start; password = malloc(len + 1); + if(!password){ + fprintf(stderr, "Error: Out of memory.\n"); + goto cleanup; + } memcpy(password, &(str[start]), len); password[len] = '\0'; start = i+1; @@ -930,6 +1189,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url) } len = i-start; username = malloc(len + 1); + if(!username){ + fprintf(stderr, "Error: Out of memory.\n"); + goto cleanup; + } memcpy(username, &(str[start]), len); username[len] = '\0'; start = i+1; @@ -944,6 +1207,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url) /* Have already seen a @ , so this must be of form * socks5h://username[:password]@host:port */ port = malloc(len + 1); + if(!port){ + fprintf(stderr, "Error: Out of memory.\n"); + goto cleanup; + } memcpy(port, &(str[start]), len); port[len] = '\0'; }else if(username_or_host){ @@ -952,6 +1219,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url) host = username_or_host; username_or_host = NULL; port = malloc(len + 1); + if(!port){ + fprintf(stderr, "Error: Out of memory.\n"); + goto cleanup; + } memcpy(port, &(str[start]), len); port[len] = '\0'; }else{ diff --git a/client_shared.h b/client_shared.h index ebf2607..1d22cec 100644 --- a/client_shared.h +++ b/client_shared.h @@ -16,8 +16,8 @@ Contributors: V Krishn - implement dirpub. */ -#ifndef _CLIENT_CONFIG_H -#define _CLIENT_CONFIG_H +#ifndef CLIENT_CONFIG_H +#define CLIENT_CONFIG_H #include <stdio.h> @@ -73,15 +73,20 @@ struct mosq_config { char *psk_identity; # endif #endif - bool clean_session; /* sub */ + bool clean_session; char **topics; /* sub */ int topic_count; /* sub */ bool no_retain; /* sub */ + bool retained_only; /* sub */ char **filter_outs; /* sub */ int filter_out_count; /* sub */ + char **unsub_topics; /* sub */ + int unsub_topic_count; /* sub */ bool verbose; /* sub */ bool eol; /* sub */ int msg_count; /* sub */ + char *format; /* sub */ + int timeout; /* sub */ #ifdef WITH_SOCKS char *socks5_host; int socks5_port; diff --git a/sub_client.c b/sub_client.c index 1661658..2d9ce5b 100644 --- a/sub_client.c +++ b/sub_client.c @@ -16,14 +16,18 @@ Contributors: V Krishn - implement dirpub. */ +#define _POSIX_C_SOURCE 200809L + #include <assert.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <time.h> #include <libgen.h> #ifndef WIN32 #include <unistd.h> +#include <signal.h> #else #include <sysstat.h> #include <process.h> @@ -32,13 +36,26 @@ Contributors: #endif #include <sys/stat.h> #include <sys/types.h> -#include <time.h> #include <mosquitto.h> #include "client_shared.h" bool process_messages = true; int msg_count = 0; +struct mosquitto *mosq = NULL; + +#ifndef WIN32 +void my_signal_handler(int signum) +{ + if(signum == SIGALRM){ + process_messages = false; + mosquitto_disconnect(mosq); + } +} +#endif + +void print_message(struct mosq_config *cfg, const struct mosquitto_message *message); + /* @(#)Purpose: Create all directories in path @(#)Author: J Leffler @@ -304,6 +321,12 @@ void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mo assert(obj); cfg = (struct mosq_config *)obj; + if(cfg->retained_only && !message->retain && process_messages){ + process_messages = false; + mosquitto_disconnect(mosq); + return; + } + if(message->retain && cfg->no_retain) return; if(cfg->filter_outs){ for(i=0; i<cfg->filter_out_count; i++){ @@ -393,6 +416,12 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit assert(obj); cfg = (struct mosq_config *)obj; + if(cfg->retained_only && !message->retain && process_messages){ + process_messages = false; + mosquitto_disconnect(mosq); + return; + } + if(message->retain && cfg->no_retain) return; if(cfg->filter_outs){ for(i=0; i<cfg->filter_out_count; i++){ @@ -401,28 +430,8 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit } } - if(cfg->verbose){ - if(message->payloadlen){ - printf("%s ", message->topic); - fwrite(message->payload, 1, message->payloadlen, stdout); - if(cfg->eol){ - printf("\n"); - } - }else{ - if(cfg->eol){ - printf("%s (null)\n", message->topic); - } - } - fflush(stdout); - }else{ - if(message->payloadlen){ - fwrite(message->payload, 1, message->payloadlen, stdout); - if(cfg->eol){ - printf("\n"); - } - fflush(stdout); - } - } + print_message(cfg, message); + if(cfg->msg_count>0){ msg_count++; if(cfg->msg_count == msg_count){ @@ -432,7 +441,7 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit } } -void my_connect_callback(struct mosquitto *mosq, void *obj, int result) +void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags) { int i; struct mosq_config *cfg; @@ -444,10 +453,14 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result) for(i=0; i<cfg->topic_count; i++){ mosquitto_subscribe(mosq, NULL, cfg->topics[i], cfg->qos); } + for(i=0; i<cfg->unsub_topic_count; i++){ + mosquitto_unsubscribe(mosq, NULL, cfg->unsub_topics[i]); + } }else{ if(result && !cfg->quiet){ fprintf(stderr, "%s\n", mosquitto_connack_string(result)); } + mosquitto_disconnect(mosq); } } @@ -476,10 +489,15 @@ void print_usage(void) int major, minor, revision; mosquitto_lib_version(&major, &minor, &revision); - printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n"); + printf("mosquitto_sub is a simple mqtt client that will subscribe to a set of topics and print all messages it receives.\n"); printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision); - printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n"); - printf(" [-C msg_count] [-T filter_out]\n"); + printf("Usage: mosquitto_sub {[-h host] [-p port] [-u username [-P password]] -t topic | -L URL [-t topic]}\n"); + printf(" [-c] [-k keepalive] [-q qos]\n"); + printf(" [-C msg_count] [-R] [--retained-only] [-T filter_out] [-U topic ...]\n"); + printf(" [-F format]\n"); +#ifndef WIN32 + printf(" [-W timeout_secs]\n"); +#endif #ifdef WITH_SRV printf(" [-A bind_address] [-S]\n"); #else @@ -487,7 +505,6 @@ void print_usage(void) #endif printf(" [-i id] [-I id_prefix]\n"); printf(" [-d] [-N] [--quiet] [-v]\n"); - printf(" [-u username [-P password]]\n"); printf(" [--fmask outfile [--overwrite]]\n"); printf(" [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n"); #ifdef WITH_TLS @@ -506,14 +523,17 @@ void print_usage(void) printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n"); printf(" -C : disconnect and exit after receiving the 'msg_count' messages.\n"); printf(" -d : enable debug messages.\n"); + printf(" -F : output format.\n"); printf(" -h : mqtt host to connect to. Defaults to localhost.\n"); printf(" -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.\n"); printf(" -I : define the client id as id_prefix appended with the process id. Useful for when the\n"); printf(" broker is using the clientid_prefixes option.\n"); printf(" -k : keep alive in seconds for this client. Defaults to 60.\n"); + printf(" -L : specify user, password, hostname, port and topic as a URL in the form:\n"); + printf(" mqtt(s)://[username[:password]@]host[:port]/topic\n"); printf(" -N : do not add an end of line character when printing the payload.\n"); - printf(" -p : network port to connect to. Defaults to 1883.\n"); - printf(" -P : provide a password (requires MQTT 3.1 broker)\n"); + printf(" -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS.\n"); + printf(" -P : provide a password\n"); printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n"); printf(" -R : do not print stale messages (those with retain set).\n"); #ifdef WITH_SRV @@ -521,12 +541,18 @@ void print_usage(void) #endif printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n"); printf(" -T : topic string to filter out of results. May be repeated.\n"); - printf(" -u : provide a username (requires MQTT 3.1 broker)\n"); + printf(" -u : provide a username\n"); + printf(" -U : unsubscribe from a topic. May be repeated.\n"); printf(" -v : print published messages verbosely.\n"); printf(" -V : specify the version of the MQTT protocol to use when connecting.\n"); - printf(" Can be mqttv31 or mqttv311. Defaults to mqttv31.\n"); + printf(" Can be mqttv31 or mqttv311. Defaults to mqttv311.\n"); +#ifndef WIN32 + printf(" -W : Specifies a timeout in seconds how long to process incoming MQTT messages.\n"); +#endif printf(" --help : display this message.\n"); printf(" --quiet : don't print error messages.\n"); + printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n"); + printf(" first non-retained message is received.\n"); printf(" --fmask : path to message outfile\n"); printf(" allowed masks are:\n"); printf(" @[epoch|date|year|month|day|datetime|hour|min|sec|id|topic[1-9]] \n"); @@ -569,9 +595,13 @@ void print_usage(void) int main(int argc, char *argv[]) { struct mosq_config cfg; - struct mosquitto *mosq = NULL; int rc; +#ifndef WIN32 + struct sigaction sigact; +#endif + memset(&cfg, 0, sizeof(struct mosq_config)); + rc = client_config_load(&cfg, CLIENT_SUB, argc, argv); if(rc){ client_config_cleanup(&cfg); @@ -584,6 +614,11 @@ int main(int argc, char *argv[]) return 1; } + if(cfg.no_retain && cfg.retained_only){ + fprintf(stderr, "\nError: Combining '-R' and '--retained-only' makes no sense.\n"); + return 1; + } + mosquitto_lib_init(); if(client_id_generate(&cfg, "mosqsub")){ @@ -611,7 +646,8 @@ int main(int argc, char *argv[]) mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); } - mosquitto_connect_callback_set(mosq, my_connect_callback); + mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback); + if(cfg.isfmask) { mosquitto_message_callback_set(mosq, my_message_file_callback); } else { @@ -621,6 +657,20 @@ int main(int argc, char *argv[]) rc = client_connect(mosq, &cfg); if(rc) return rc; +#ifndef WIN32 + sigact.sa_handler = my_signal_handler; + sigemptyset(&sigact.sa_mask); + sigact.sa_flags = 0; + + if(sigaction(SIGALRM, &sigact, NULL) == -1){ + perror("sigaction"); + return 1; + } + + if(cfg.timeout){ + alarm(cfg.timeout); + } +#endif rc = mosquitto_loop_forever(mosq, -1, 1); |