aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorV.Krishn <vkrishn4@gmail.com>2018-09-05 11:08:40 +0530
committerV.Krishn <vkrishn4@gmail.com>2018-09-05 11:08:40 +0530
commit0fa50f954a2b51c0dcbf1cca1600cd65e4151b6b (patch)
tree4334d70c67c2a57d2332de14259ec1b7618edd11
parent4d0bc60f3aab7922dba8e4ff6c9414803c5794d3 (diff)
downloadmqtt-dirpub-0fa50f954a2b51c0dcbf1cca1600cd65e4151b6b.tar.bz2
update to mosquitto-v1.5.0
-rw-r--r--client_shared.c321
-rw-r--r--client_shared.h11
-rw-r--r--sub_client.c118
3 files changed, 388 insertions, 62 deletions
diff --git a/client_shared.c b/client_shared.c
index 48f25e2..b3ab691 100644
--- a/client_shared.c
+++ b/client_shared.c
@@ -16,6 +16,7 @@ Contributors:
V Krishn - implement dirpub.
*/
+#define _POSIX_C_SOURCE 200809L
#include <errno.h>
#include <fcntl.h>
@@ -24,10 +25,12 @@ Contributors:
#include <string.h>
#ifndef WIN32
#include <unistd.h>
+#include <strings.h>
#else
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
+#define strncasecmp _strnicmp
#endif
#include <mosquitto.h>
@@ -36,10 +39,91 @@ Contributors:
static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url);
static int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]);
+
+static int check_format(struct mosq_config *cfg, const char *str)
+{
+ int i;
+ int len;
+
+ len = strlen(str);
+ for(i=0; i<len; i++){
+ if(str[i] == '%'){
+ if(i == len-1){
+ // error
+ fprintf(stderr, "Error: Incomplete format specifier.\n");
+ return 1;
+ }else{
+ if(str[i+1] == '%'){
+ // Print %, ignore
+ }else if(str[i+1] == 'I'){
+ // ISO 8601 date+time
+ }else if(str[i+1] == 'l'){
+ // payload length
+ }else if(str[i+1] == 'm'){
+ // mid
+ }else if(str[i+1] == 'p'){
+ // payload
+ }else if(str[i+1] == 'q'){
+ // qos
+ }else if(str[i+1] == 'r'){
+ // retain
+ }else if(str[i+1] == 't'){
+ // topic
+ }else if(str[i+1] == 'j'){
+ // JSON output, escaped payload
+ }else if(str[i+1] == 'J'){
+ // JSON output, assuming JSON payload
+ }else if(str[i+1] == 'U'){
+ // Unix time+nanoseconds
+ }else if(str[i+1] == 'x' || str[i+1] == 'X'){
+ // payload in hex
+ }else{
+ fprintf(stderr, "Error: Invalid format specifier '%c'.\n", str[i+1]);
+ return 1;
+ }
+ i++;
+ }
+ }else if(str[i] == '@'){
+ if(i == len-1){
+ // error
+ fprintf(stderr, "Error: Incomplete format specifier.\n");
+ return 1;
+ }
+ i++;
+ }else if(str[i] == '\\'){
+ if(i == len-1){
+ // error
+ fprintf(stderr, "Error: Incomplete escape specifier.\n");
+ return 1;
+ }else{
+ switch(str[i+1]){
+ case '\\': // '\'
+ case '0': // 0 (NULL)
+ case 'a': // alert
+ case 'e': // escape
+ case 'n': // new line
+ case 'r': // carriage return
+ case 't': // horizontal tab
+ case 'v': // vertical tab
+ break;
+
+ default:
+ fprintf(stderr, "Error: Invalid escape specifier '%c'.\n", str[i+1]);
+ return 1;
+ }
+ i++;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
void init_config(struct mosq_config *cfg)
{
memset(cfg, 0, sizeof(*cfg));
- cfg->port = 1883;
+ cfg->port = -1;
cfg->max_inflight = 20;
cfg->keepalive = 60;
cfg->clean_session = true;
@@ -65,6 +149,7 @@ void client_config_cleanup(struct mosq_config *cfg)
free(cfg->password);
free(cfg->will_topic);
free(cfg->will_payload);
+ free(cfg->format);
#ifdef WITH_TLS
free(cfg->cafile);
free(cfg->capath);
@@ -89,6 +174,12 @@ void client_config_cleanup(struct mosq_config *cfg)
}
free(cfg->filter_outs);
}
+ if(cfg->unsub_topics){
+ for(i=0; i<cfg->unsub_topic_count; i++){
+ free(cfg->unsub_topics[i]);
+ }
+ free(cfg->unsub_topics);
+ }
#ifdef WITH_SOCKS
free(cfg->socks5_host);
free(cfg->socks5_username);
@@ -240,11 +331,12 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *
}
#endif
+ if(cfg->clean_session == false && (cfg->id_prefix || !cfg->id)){
+ if(!cfg->quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n");
+ return 1;
+ }
+
if(pub_or_sub == CLIENT_SUB){
- if(cfg->clean_session == false && (cfg->id_prefix || !cfg->id)){
- if(!cfg->quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n");
- return 1;
- }
if(cfg->topic_count == 0){
if(!cfg->quiet) fprintf(stderr, "Error: You must specify a topic to subscribe to.\n");
return 1;
@@ -257,6 +349,34 @@ int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *
return MOSQ_ERR_SUCCESS;
}
+int cfg_add_topic(struct mosq_config *cfg, int pub_or_sub, char *topic, const char *arg)
+{
+ if(mosquitto_validate_utf8(topic, strlen(topic))){
+ fprintf(stderr, "Error: Malformed UTF-8 in %s argument.\n\n", arg);
+ return 1;
+ }
+ if(pub_or_sub == CLIENT_PUB){
+ if(mosquitto_pub_topic_check(topic) == MOSQ_ERR_INVAL){
+ fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", topic);
+ return 1;
+ }
+ cfg->topic = strdup(topic);
+ } else {
+ if(mosquitto_sub_topic_check(topic) == MOSQ_ERR_INVAL){
+ fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", topic);
+ return 1;
+ }
+ cfg->topic_count++;
+ cfg->topics = realloc(cfg->topics, cfg->topic_count*sizeof(char *));
+ if(!cfg->topics){
+ fprintf(stderr, "Error: Out of memory.\n");
+ return 1;
+ }
+ cfg->topics[cfg->topic_count-1] = strdup(topic);
+ }
+ return 0;
+}
+
/* Process a tokenised single line from a file or set of real argc/argv */
int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[])
{
@@ -354,6 +474,22 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
}
i++;
}
+ }else if(!strcmp(argv[i], "-W")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }else{
+ if(i==argc-1){
+ fprintf(stderr, "Error: -W argument given but no timeout specified.\n\n");
+ return 1;
+ }else{
+ cfg->timeout = atoi(argv[i+1]);
+ if(cfg->timeout < 1){
+ fprintf(stderr, "Error: Invalid timeout \"%d\".\n\n", cfg->msg_count);
+ return 1;
+ }
+ }
+ i++;
+ }
}else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){
cfg->debug = true;
}else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){
@@ -369,6 +505,28 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
}else{
cfg->pub_mode = MSGMODE_FILE;
cfg->file_input = strdup(argv[i+1]);
+ if(!cfg->file_input){
+ fprintf(stderr, "Error: Out of memory.\n");
+ return 1;
+ }
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-F")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ if(i==argc-1){
+ fprintf(stderr, "Error: -F argument given but no format specified.\n\n");
+ return 1;
+ }else{
+ cfg->format = strdup(argv[i+1]);
+ if(!cfg->format){
+ fprintf(stderr, "Error: Out of memory.\n");
+ return 1;
+ }
+ if(check_format(cfg, cfg->format)){
+ return 1;
+ }
}
i++;
}else if(!strcmp(argv[i], "--help")){
@@ -431,6 +589,51 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
}
i++;
#endif
+ }else if(!strcmp(argv[i], "-L") || !strcmp(argv[i], "--url")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -L argument given but no URL specified.\n\n");
+ return 1;
+ } else {
+ char *url = argv[i+1];
+ char *topic;
+ char *tmp;
+
+ if(!strncasecmp(url, "mqtt://", 7)) {
+ url += 7;
+ cfg->port = 1883;
+ } else if(!strncasecmp(url, "mqtts://", 8)) {
+ url += 8;
+ cfg->port = 8883;
+ } else {
+ fprintf(stderr, "Error: unsupported URL scheme.\n\n");
+ return 1;
+ }
+ topic = strchr(url, '/');
+ *topic++ = 0;
+
+ if(cfg_add_topic(cfg, pub_or_sub, topic, "-L topic"))
+ return 1;
+
+ tmp = strchr(url, '@');
+ if(tmp) {
+ char *colon = strchr(url, ':');
+ *tmp++ = 0;
+ if(colon) {
+ *colon = 0;
+ cfg->password = colon + 1;
+ }
+ cfg->username = url;
+ url = tmp;
+ }
+ cfg->host = url;
+
+ tmp = strchr(url, ':');
+ if(tmp) {
+ *tmp++ = 0;
+ cfg->port = atoi(tmp);
+ }
+ }
+ i++;
}else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--stdin-line")){
if(pub_or_sub == CLIENT_SUB){
goto unknown_option;
@@ -539,6 +742,11 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
goto unknown_option;
}
cfg->retain = 1;
+ }else if(!strcmp(argv[i], "--retained-only")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ cfg->retained_only = true;
}else if(!strcmp(argv[i], "-s") || !strcmp(argv[i], "--stdin-file")){
if(pub_or_sub == CLIENT_SUB){
goto unknown_option;
@@ -558,21 +766,8 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
fprintf(stderr, "Error: -t argument given but no topic specified.\n\n");
return 1;
}else{
- if(pub_or_sub == CLIENT_PUB){
- if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
- fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", argv[i+1]);
- return 1;
- }
- cfg->topic = strdup(argv[i+1]);
- }else{
- if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
- fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]);
- return 1;
- }
- cfg->topic_count++;
- cfg->topics = realloc(cfg->topics, cfg->topic_count*sizeof(char *));
- cfg->topics[cfg->topic_count-1] = strdup(argv[i+1]);
- }
+ if(cfg_add_topic(cfg, pub_or_sub, argv[i + 1], "-t"))
+ return 1;
i++;
}
}else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){
@@ -583,15 +778,48 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
fprintf(stderr, "Error: -T argument given but no topic filter specified.\n\n");
return 1;
}else{
+ if(mosquitto_validate_utf8(argv[i+1], strlen(argv[i+1]))){
+ fprintf(stderr, "Error: Malformed UTF-8 in -T argument.\n\n");
+ return 1;
+ }
if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
fprintf(stderr, "Error: Invalid filter topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]);
return 1;
}
cfg->filter_out_count++;
cfg->filter_outs = realloc(cfg->filter_outs, cfg->filter_out_count*sizeof(char *));
+ if(!cfg->filter_outs){
+ fprintf(stderr, "Error: Out of memory.\n");
+ return 1;
+ }
cfg->filter_outs[cfg->filter_out_count-1] = strdup(argv[i+1]);
}
i++;
+ }else if(!strcmp(argv[i], "-U") || !strcmp(argv[i], "--unsubscribe")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ if(i==argc-1){
+ fprintf(stderr, "Error: -U argument given but no unsubscribe topic specified.\n\n");
+ return 1;
+ }else{
+ if(mosquitto_validate_utf8(argv[i+1], strlen(argv[i+1]))){
+ fprintf(stderr, "Error: Malformed UTF-8 in -U argument.\n\n");
+ return 1;
+ }
+ if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
+ fprintf(stderr, "Error: Invalid unsubscribe topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]);
+ return 1;
+ }
+ cfg->unsub_topic_count++;
+ cfg->unsub_topics = realloc(cfg->unsub_topics, cfg->unsub_topic_count*sizeof(char *));
+ if(!cfg->unsub_topics){
+ fprintf(stderr, "Error: Out of memory.\n");
+ return 1;
+ }
+ cfg->unsub_topics[cfg->unsub_topic_count-1] = strdup(argv[i+1]);
+ }
+ i++;
#ifdef WITH_TLS
}else if(!strcmp(argv[i], "--tls-version")){
if(i==argc-1){
@@ -646,6 +874,10 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
fprintf(stderr, "Error: --will-topic argument given but no will topic specified.\n\n");
return 1;
}else{
+ if(mosquitto_validate_utf8(argv[i+1], strlen(argv[i+1]))){
+ fprintf(stderr, "Error: Malformed UTF-8 in --will-topic argument.\n\n");
+ return 1;
+ }
if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
fprintf(stderr, "Error: Invalid will topic '%s', does it contain '+' or '#'?\n", argv[i+1]);
return 1;
@@ -654,9 +886,6 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
}
i++;
}else if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "--disable-clean-session")){
- if(pub_or_sub == CLIENT_PUB){
- goto unknown_option;
- }
cfg->clean_session = false;
}else if(!strcmp(argv[i], "-N")){
if(pub_or_sub == CLIENT_PUB){
@@ -779,15 +1008,33 @@ int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
{
char err[1024];
int rc;
+ int port;
+
+#ifdef WITH_TLS
+ if(cfg->port < 0){
+ if(cfg->cafile || cfg->capath
+#ifdef WITH_TLS_PSK
+ || cfg->psk
+#endif
+ ){
+ port = 8883;
+ }else{
+ port = 1883;
+ }
+ }else
+#endif
+ {
+ port = cfg->port;
+ }
#ifdef WITH_SRV
if(cfg->use_srv){
rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);
}else{
- rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
+ rc = mosquitto_connect_bind(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address);
}
#else
- rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
+ rc = mosquitto_connect_bind(mosq, cfg->host, port, cfg->keepalive, cfg->bind_address);
#endif
if(rc>0){
if(!cfg->quiet){
@@ -893,6 +1140,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url)
}
len = i-start;
host = malloc(len + 1);
+ if(!host){
+ fprintf(stderr, "Error: Out of memory.\n");
+ goto cleanup;
+ }
memcpy(host, &(str[start]), len);
host[len] = '\0';
start = i+1;
@@ -902,6 +1153,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url)
* socks5h://username:password@host[:port] */
len = i-start;
username_or_host = malloc(len + 1);
+ if(!username_or_host){
+ fprintf(stderr, "Error: Out of memory.\n");
+ goto cleanup;
+ }
memcpy(username_or_host, &(str[start]), len);
username_or_host[len] = '\0';
start = i+1;
@@ -918,6 +1173,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url)
len = i-start;
password = malloc(len + 1);
+ if(!password){
+ fprintf(stderr, "Error: Out of memory.\n");
+ goto cleanup;
+ }
memcpy(password, &(str[start]), len);
password[len] = '\0';
start = i+1;
@@ -930,6 +1189,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url)
}
len = i-start;
username = malloc(len + 1);
+ if(!username){
+ fprintf(stderr, "Error: Out of memory.\n");
+ goto cleanup;
+ }
memcpy(username, &(str[start]), len);
username[len] = '\0';
start = i+1;
@@ -944,6 +1207,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url)
/* Have already seen a @ , so this must be of form
* socks5h://username[:password]@host:port */
port = malloc(len + 1);
+ if(!port){
+ fprintf(stderr, "Error: Out of memory.\n");
+ goto cleanup;
+ }
memcpy(port, &(str[start]), len);
port[len] = '\0';
}else if(username_or_host){
@@ -952,6 +1219,10 @@ static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url)
host = username_or_host;
username_or_host = NULL;
port = malloc(len + 1);
+ if(!port){
+ fprintf(stderr, "Error: Out of memory.\n");
+ goto cleanup;
+ }
memcpy(port, &(str[start]), len);
port[len] = '\0';
}else{
diff --git a/client_shared.h b/client_shared.h
index ebf2607..1d22cec 100644
--- a/client_shared.h
+++ b/client_shared.h
@@ -16,8 +16,8 @@ Contributors:
V Krishn - implement dirpub.
*/
-#ifndef _CLIENT_CONFIG_H
-#define _CLIENT_CONFIG_H
+#ifndef CLIENT_CONFIG_H
+#define CLIENT_CONFIG_H
#include <stdio.h>
@@ -73,15 +73,20 @@ struct mosq_config {
char *psk_identity;
# endif
#endif
- bool clean_session; /* sub */
+ bool clean_session;
char **topics; /* sub */
int topic_count; /* sub */
bool no_retain; /* sub */
+ bool retained_only; /* sub */
char **filter_outs; /* sub */
int filter_out_count; /* sub */
+ char **unsub_topics; /* sub */
+ int unsub_topic_count; /* sub */
bool verbose; /* sub */
bool eol; /* sub */
int msg_count; /* sub */
+ char *format; /* sub */
+ int timeout; /* sub */
#ifdef WITH_SOCKS
char *socks5_host;
int socks5_port;
diff --git a/sub_client.c b/sub_client.c
index 1661658..2d9ce5b 100644
--- a/sub_client.c
+++ b/sub_client.c
@@ -16,14 +16,18 @@ Contributors:
V Krishn - implement dirpub.
*/
+#define _POSIX_C_SOURCE 200809L
+
#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <time.h>
#include <libgen.h>
#ifndef WIN32
#include <unistd.h>
+#include <signal.h>
#else
#include <sysstat.h>
#include <process.h>
@@ -32,13 +36,26 @@ Contributors:
#endif
#include <sys/stat.h>
#include <sys/types.h>
-#include <time.h>
#include <mosquitto.h>
#include "client_shared.h"
bool process_messages = true;
int msg_count = 0;
+struct mosquitto *mosq = NULL;
+
+#ifndef WIN32
+void my_signal_handler(int signum)
+{
+ if(signum == SIGALRM){
+ process_messages = false;
+ mosquitto_disconnect(mosq);
+ }
+}
+#endif
+
+void print_message(struct mosq_config *cfg, const struct mosquitto_message *message);
+
/*
@(#)Purpose: Create all directories in path
@(#)Author: J Leffler
@@ -304,6 +321,12 @@ void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mo
assert(obj);
cfg = (struct mosq_config *)obj;
+ if(cfg->retained_only && !message->retain && process_messages){
+ process_messages = false;
+ mosquitto_disconnect(mosq);
+ return;
+ }
+
if(message->retain && cfg->no_retain) return;
if(cfg->filter_outs){
for(i=0; i<cfg->filter_out_count; i++){
@@ -393,6 +416,12 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit
assert(obj);
cfg = (struct mosq_config *)obj;
+ if(cfg->retained_only && !message->retain && process_messages){
+ process_messages = false;
+ mosquitto_disconnect(mosq);
+ return;
+ }
+
if(message->retain && cfg->no_retain) return;
if(cfg->filter_outs){
for(i=0; i<cfg->filter_out_count; i++){
@@ -401,28 +430,8 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit
}
}
- if(cfg->verbose){
- if(message->payloadlen){
- printf("%s ", message->topic);
- fwrite(message->payload, 1, message->payloadlen, stdout);
- if(cfg->eol){
- printf("\n");
- }
- }else{
- if(cfg->eol){
- printf("%s (null)\n", message->topic);
- }
- }
- fflush(stdout);
- }else{
- if(message->payloadlen){
- fwrite(message->payload, 1, message->payloadlen, stdout);
- if(cfg->eol){
- printf("\n");
- }
- fflush(stdout);
- }
- }
+ print_message(cfg, message);
+
if(cfg->msg_count>0){
msg_count++;
if(cfg->msg_count == msg_count){
@@ -432,7 +441,7 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit
}
}
-void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
+void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags)
{
int i;
struct mosq_config *cfg;
@@ -444,10 +453,14 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
for(i=0; i<cfg->topic_count; i++){
mosquitto_subscribe(mosq, NULL, cfg->topics[i], cfg->qos);
}
+ for(i=0; i<cfg->unsub_topic_count; i++){
+ mosquitto_unsubscribe(mosq, NULL, cfg->unsub_topics[i]);
+ }
}else{
if(result && !cfg->quiet){
fprintf(stderr, "%s\n", mosquitto_connack_string(result));
}
+ mosquitto_disconnect(mosq);
}
}
@@ -476,10 +489,15 @@ void print_usage(void)
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
- printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n");
+ printf("mosquitto_sub is a simple mqtt client that will subscribe to a set of topics and print all messages it receives.\n");
printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
- printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n");
- printf(" [-C msg_count] [-T filter_out]\n");
+ printf("Usage: mosquitto_sub {[-h host] [-p port] [-u username [-P password]] -t topic | -L URL [-t topic]}\n");
+ printf(" [-c] [-k keepalive] [-q qos]\n");
+ printf(" [-C msg_count] [-R] [--retained-only] [-T filter_out] [-U topic ...]\n");
+ printf(" [-F format]\n");
+#ifndef WIN32
+ printf(" [-W timeout_secs]\n");
+#endif
#ifdef WITH_SRV
printf(" [-A bind_address] [-S]\n");
#else
@@ -487,7 +505,6 @@ void print_usage(void)
#endif
printf(" [-i id] [-I id_prefix]\n");
printf(" [-d] [-N] [--quiet] [-v]\n");
- printf(" [-u username [-P password]]\n");
printf(" [--fmask outfile [--overwrite]]\n");
printf(" [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n");
#ifdef WITH_TLS
@@ -506,14 +523,17 @@ void print_usage(void)
printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n");
printf(" -C : disconnect and exit after receiving the 'msg_count' messages.\n");
printf(" -d : enable debug messages.\n");
+ printf(" -F : output format.\n");
printf(" -h : mqtt host to connect to. Defaults to localhost.\n");
printf(" -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.\n");
printf(" -I : define the client id as id_prefix appended with the process id. Useful for when the\n");
printf(" broker is using the clientid_prefixes option.\n");
printf(" -k : keep alive in seconds for this client. Defaults to 60.\n");
+ printf(" -L : specify user, password, hostname, port and topic as a URL in the form:\n");
+ printf(" mqtt(s)://[username[:password]@]host[:port]/topic\n");
printf(" -N : do not add an end of line character when printing the payload.\n");
- printf(" -p : network port to connect to. Defaults to 1883.\n");
- printf(" -P : provide a password (requires MQTT 3.1 broker)\n");
+ printf(" -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS.\n");
+ printf(" -P : provide a password\n");
printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n");
printf(" -R : do not print stale messages (those with retain set).\n");
#ifdef WITH_SRV
@@ -521,12 +541,18 @@ void print_usage(void)
#endif
printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n");
printf(" -T : topic string to filter out of results. May be repeated.\n");
- printf(" -u : provide a username (requires MQTT 3.1 broker)\n");
+ printf(" -u : provide a username\n");
+ printf(" -U : unsubscribe from a topic. May be repeated.\n");
printf(" -v : print published messages verbosely.\n");
printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
- printf(" Can be mqttv31 or mqttv311. Defaults to mqttv31.\n");
+ printf(" Can be mqttv31 or mqttv311. Defaults to mqttv311.\n");
+#ifndef WIN32
+ printf(" -W : Specifies a timeout in seconds how long to process incoming MQTT messages.\n");
+#endif
printf(" --help : display this message.\n");
printf(" --quiet : don't print error messages.\n");
+ printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n");
+ printf(" first non-retained message is received.\n");
printf(" --fmask : path to message outfile\n");
printf(" allowed masks are:\n");
printf(" @[epoch|date|year|month|day|datetime|hour|min|sec|id|topic[1-9]] \n");
@@ -569,9 +595,13 @@ void print_usage(void)
int main(int argc, char *argv[])
{
struct mosq_config cfg;
- struct mosquitto *mosq = NULL;
int rc;
+#ifndef WIN32
+ struct sigaction sigact;
+#endif
+ memset(&cfg, 0, sizeof(struct mosq_config));
+
rc = client_config_load(&cfg, CLIENT_SUB, argc, argv);
if(rc){
client_config_cleanup(&cfg);
@@ -584,6 +614,11 @@ int main(int argc, char *argv[])
return 1;
}
+ if(cfg.no_retain && cfg.retained_only){
+ fprintf(stderr, "\nError: Combining '-R' and '--retained-only' makes no sense.\n");
+ return 1;
+ }
+
mosquitto_lib_init();
if(client_id_generate(&cfg, "mosqsub")){
@@ -611,7 +646,8 @@ int main(int argc, char *argv[])
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
}
- mosquitto_connect_callback_set(mosq, my_connect_callback);
+ mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback);
+
if(cfg.isfmask) {
mosquitto_message_callback_set(mosq, my_message_file_callback);
} else {
@@ -621,6 +657,20 @@ int main(int argc, char *argv[])
rc = client_connect(mosq, &cfg);
if(rc) return rc;
+#ifndef WIN32
+ sigact.sa_handler = my_signal_handler;
+ sigemptyset(&sigact.sa_mask);
+ sigact.sa_flags = 0;
+
+ if(sigaction(SIGALRM, &sigact, NULL) == -1){
+ perror("sigaction");
+ return 1;
+ }
+
+ if(cfg.timeout){
+ alarm(cfg.timeout);
+ }
+#endif
rc = mosquitto_loop_forever(mosq, -1, 1);