diff options
authorV.Krishn <vkrishn4@gmail.com>2015-03-12 20:39:32 +0530
committerV.Krishn <vkrishn4@gmail.com>2015-03-12 20:39:32 +0530
commit2e75f767d64a72b00db5f1080e520f2b0737007e (patch)
parented3d771aa17bc9f1f4a861ca4e42b8478db373a2 (diff)
Update from mosquitto v1.4.0
Upstream files refactored. Not fully tested, but kinda works for now. On heavy traffic it may segfault.
3 files changed, 1219 insertions, 553 deletions
diff --git a/client_shared.c b/client_shared.c
new file mode 100644
index 0000000..945b2fb
--- /dev/null
+++ b/client_shared.c
@@ -0,0 +1,976 @@
+Copyright (c) 2014 Roger Light <roger@atchoo.org>
+Copyright (c) 2015 V.Krishn <vkrishn@insteps.net>
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution.
+The Eclipse Public License is available at
+ http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at
+ http://www.eclipse.org/org/documents/edl-v10.php.
+ Roger Light - initial implementation and documentation.
+ V Krishn - implement dirpub.
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#ifndef WIN32
+#include <unistd.h>
+#include <process.h>
+#include <winsock2.h>
+#define snprintf sprintf_s
+#include <mosquitto.h>
+#include "client_shared.h"
+static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url);
+static int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]);
+void init_config(struct mosq_config *cfg)
+ memset(cfg, 0, sizeof(*cfg));
+ cfg->port = 1883;
+ cfg->max_inflight = 20;
+ cfg->keepalive = 60;
+ cfg->clean_session = true;
+ cfg->eol = true;
+ cfg->protocol_version = MQTT_PROTOCOL_V31;
+ cfg->isfmask = false;
+ cfg->overwrite = false;
+void client_config_cleanup(struct mosq_config *cfg)
+ int i;
+ if(cfg->id) free(cfg->id);
+ if(cfg->id_prefix) free(cfg->id_prefix);
+ if(cfg->host) free(cfg->host);
+ if(cfg->file_input) free(cfg->file_input);
+ if(cfg->message) free(cfg->message);
+ if(cfg->topic) free(cfg->topic);
+ if(cfg->bind_address) free(cfg->bind_address);
+ if(cfg->username) free(cfg->username);
+ if(cfg->password) free(cfg->password);
+ if(cfg->will_topic) free(cfg->will_topic);
+ if(cfg->will_payload) free(cfg->will_payload);
+#ifdef WITH_TLS
+ if(cfg->cafile) free(cfg->cafile);
+ if(cfg->capath) free(cfg->capath);
+ if(cfg->certfile) free(cfg->certfile);
+ if(cfg->keyfile) free(cfg->keyfile);
+ if(cfg->ciphers) free(cfg->ciphers);
+ if(cfg->tls_version) free(cfg->tls_version);
+# ifdef WITH_TLS_PSK
+ if(cfg->psk) free(cfg->psk);
+ if(cfg->psk_identity) free(cfg->psk_identity);
+# endif
+ if(cfg->topics){
+ for(i=0; i<cfg->topic_count; i++){
+ free(cfg->topics[i]);
+ }
+ free(cfg->topics);
+ }
+ if(cfg->filter_outs){
+ for(i=0; i<cfg->filter_out_count; i++){
+ free(cfg->filter_outs[i]);
+ }
+ free(cfg->filter_outs);
+ }
+#ifdef WITH_SOCKS
+ if(cfg->socks5_host) free(cfg->socks5_host);
+ if(cfg->socks5_username) free(cfg->socks5_username);
+ if(cfg->socks5_password) free(cfg->socks5_password);
+int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[])
+ int rc;
+ FILE *fptr;
+ char line[1024];
+ int count;
+ char *loc = NULL;
+ int len;
+ char *args[3];
+#ifndef WIN32
+ char *env;
+ char env[1024];
+ args[0] = NULL;
+ init_config(cfg);
+ /* Default config file */
+#ifndef WIN32
+ env = getenv("XDG_CONFIG_HOME");
+ if(env){
+ len = strlen(env) + strlen("/mosquitto_pub") + 1;
+ loc = malloc(len);
+ if(pub_or_sub == CLIENT_PUB){
+ snprintf(loc, len, "%s/mosquitto_pub", env);
+ }else{
+ snprintf(loc, len, "%s/mosquitto_sub", env);
+ }
+ loc[len-1] = '\0';
+ }else{
+ env = getenv("HOME");
+ if(env){
+ len = strlen(env) + strlen("/.config/mosquitto_pub") + 1;
+ loc = malloc(len);
+ if(pub_or_sub == CLIENT_PUB){
+ snprintf(loc, len, "%s/.config/mosquitto_pub", env);
+ }else{
+ snprintf(loc, len, "%s/.config/mosquitto_sub", env);
+ }
+ loc[len-1] = '\0';
+ }else{
+ fprintf(stderr, "Warning: Unable to locate configuration directory, default config not loaded.\n");
+ }
+ }
+ rc = GetEnvironmentVariable("USERPROFILE", env, 1024);
+ if(rc > 0 && rc < 1024){
+ len = strlen(env) + strlen("\\mosquitto_pub.conf") + 1;
+ loc = malloc(len);
+ if(pub_or_sub == CLIENT_PUB){
+ snprintf(loc, len, "%s\\mosquitto_pub.conf", env);
+ }else{
+ snprintf(loc, len, "%s\\mosquitto_sub.conf", env);
+ }
+ loc[len-1] = '\0';
+ }else{
+ fprintf(stderr, "Warning: Unable to locate configuration directory, default config not loaded.\n");
+ }
+ if(loc){
+ fptr = fopen(loc, "rt");
+ if(fptr){
+ while(fgets(line, 1024, fptr)){
+ if(line[0] == '#') continue; /* Comments */
+ while(line[strlen(line)-1] == 10 || line[strlen(line)-1] == 13){
+ line[strlen(line)-1] = 0;
+ }
+ /* All offset by one "args" here, because real argc/argv has
+ * program name as the first entry. */
+ args[1] = strtok(line, " ");
+ if(args[1]){
+ args[2] = strtok(NULL, " ");
+ if(args[2]){
+ count = 3;
+ }else{
+ count = 2;
+ }
+ rc = client_config_line_proc(cfg, pub_or_sub, count, args);
+ if(rc){
+ fclose(fptr);
+ free(loc);
+ return rc;
+ }
+ }
+ }
+ fclose(fptr);
+ }
+ free(loc);
+ }
+ /* Deal with real argc/argv */
+ rc = client_config_line_proc(cfg, pub_or_sub, argc, argv);
+ if(rc) return rc;
+ if(cfg->will_payload && !cfg->will_topic){
+ fprintf(stderr, "Error: Will payload given, but no will topic given.\n");
+ return 1;
+ }
+ if(cfg->will_retain && !cfg->will_topic){
+ fprintf(stderr, "Error: Will retain given, but no will topic given.\n");
+ return 1;
+ }
+ if(cfg->password && !cfg->username){
+ if(!cfg->quiet) fprintf(stderr, "Warning: Not using password since username not set.\n");
+ }
+#ifdef WITH_TLS
+ if((cfg->certfile && !cfg->keyfile) || (cfg->keyfile && !cfg->certfile)){
+ fprintf(stderr, "Error: Both certfile and keyfile must be provided if one of them is.\n");
+ return 1;
+ }
+#ifdef WITH_TLS_PSK
+ if((cfg->cafile || cfg->capath) && cfg->psk){
+ if(!cfg->quiet) fprintf(stderr, "Error: Only one of --psk or --cafile/--capath may be used at once.\n");
+ return 1;
+ }
+ if(cfg->psk && !cfg->psk_identity){
+ if(!cfg->quiet) fprintf(stderr, "Error: --psk-identity required if --psk used.\n");
+ return 1;
+ }
+ if(pub_or_sub == CLIENT_SUB){
+ if(cfg->clean_session == false && (cfg->id_prefix || !cfg->id)){
+ if(!cfg->quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n");
+ return 1;
+ }
+ if(cfg->topic_count == 0){
+ if(!cfg->quiet) fprintf(stderr, "Error: You must specify a topic to subscribe to.\n");
+ return 1;
+ }
+ }
+ if(!cfg->host){
+ cfg->host = "localhost";
+ }
+/* Process a tokenised single line from a file or set of real argc/argv */
+int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[])
+ int i;
+ for(i=1; i<argc; i++){
+ if(!strcmp(argv[i], "-p") || !strcmp(argv[i], "--port")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -p argument given but no port specified.\n\n");
+ return 1;
+ }else{
+ cfg->port = atoi(argv[i+1]);
+ if(cfg->port<1 || cfg->port>65535){
+ fprintf(stderr, "Error: Invalid port given: %d\n", cfg->port);
+ return 1;
+ }
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-A")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -A argument given but no address specified.\n\n");
+ return 1;
+ }else{
+ cfg->bind_address = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--fmask")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --fmask argument given but no outfile specified.\n\n");
+ return 1;
+ }else{
+ cfg->fmask = argv[i+1];
+ cfg->isfmask = true;
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--overwrite")){
+ cfg->overwrite = true;
+#ifdef WITH_TLS
+ }else if(!strcmp(argv[i], "--cafile")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --cafile argument given but no file specified.\n\n");
+ return 1;
+ }else{
+ cfg->cafile = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--capath")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --capath argument given but no directory specified.\n\n");
+ return 1;
+ }else{
+ cfg->capath = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--cert")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --cert argument given but no file specified.\n\n");
+ return 1;
+ }else{
+ cfg->certfile = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--ciphers")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --ciphers argument given but no ciphers specified.\n\n");
+ return 1;
+ }else{
+ cfg->ciphers = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-C")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }else{
+ if(i==argc-1){
+ fprintf(stderr, "Error: -C argument given but no count specified.\n\n");
+ return 1;
+ }else{
+ cfg->msg_count = atoi(argv[i+1]);
+ if(cfg->msg_count < 1){
+ fprintf(stderr, "Error: Invalid message count \"%d\".\n\n", cfg->msg_count);
+ return 1;
+ }
+ }
+ i++;
+ }
+ }else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){
+ cfg->debug = true;
+ }else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){
+ if(pub_or_sub == CLIENT_SUB){
+ goto unknown_option;
+ }
+ if(cfg->pub_mode != MSGMODE_NONE){
+ fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
+ return 1;
+ }else if(i==argc-1){
+ fprintf(stderr, "Error: -f argument given but no file specified.\n\n");
+ return 1;
+ }else{
+ cfg->pub_mode = MSGMODE_FILE;
+ cfg->file_input = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--help")){
+ return 2;
+ }else if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--host")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -h argument given but no host specified.\n\n");
+ return 1;
+ }else{
+ cfg->host = strdup(argv[i+1]);
+ }
+ i++;
+#ifdef WITH_TLS
+ }else if(!strcmp(argv[i], "--insecure")){
+ cfg->insecure = true;
+ }else if(!strcmp(argv[i], "-i") || !strcmp(argv[i], "--id")){
+ if(cfg->id_prefix){
+ fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n");
+ return 1;
+ }
+ if(i==argc-1){
+ fprintf(stderr, "Error: -i argument given but no id specified.\n\n");
+ return 1;
+ }else{
+ cfg->id = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-I") || !strcmp(argv[i], "--id-prefix")){
+ if(cfg->id){
+ fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n");
+ return 1;
+ }
+ if(i==argc-1){
+ fprintf(stderr, "Error: -I argument given but no id prefix specified.\n\n");
+ return 1;
+ }else{
+ cfg->id_prefix = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-k") || !strcmp(argv[i], "--keepalive")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -k argument given but no keepalive specified.\n\n");
+ return 1;
+ }else{
+ cfg->keepalive = atoi(argv[i+1]);
+ if(cfg->keepalive>65535){
+ fprintf(stderr, "Error: Invalid keepalive given: %d\n", cfg->keepalive);
+ return 1;
+ }
+ }
+ i++;
+#ifdef WITH_TLS
+ }else if(!strcmp(argv[i], "--key")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --key argument given but no file specified.\n\n");
+ return 1;
+ }else{
+ cfg->keyfile = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--stdin-line")){
+ if(pub_or_sub == CLIENT_SUB){
+ goto unknown_option;
+ }
+ if(cfg->pub_mode != MSGMODE_NONE){
+ fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
+ return 1;
+ }else{
+ cfg->pub_mode = MSGMODE_STDIN_LINE;
+ }
+ }else if(!strcmp(argv[i], "-m") || !strcmp(argv[i], "--message")){
+ if(pub_or_sub == CLIENT_SUB){
+ goto unknown_option;
+ }
+ if(cfg->pub_mode != MSGMODE_NONE){
+ fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
+ return 1;
+ }else if(i==argc-1){
+ fprintf(stderr, "Error: -m argument given but no message specified.\n\n");
+ return 1;
+ }else{
+ cfg->message = strdup(argv[i+1]);
+ cfg->msglen = strlen(cfg->message);
+ cfg->pub_mode = MSGMODE_CMD;
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-M")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -M argument given but max_inflight not specified.\n\n");
+ return 1;
+ }else{
+ cfg->max_inflight = atoi(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-n") || !strcmp(argv[i], "--null-message")){
+ if(pub_or_sub == CLIENT_SUB){
+ goto unknown_option;
+ }
+ if(cfg->pub_mode != MSGMODE_NONE){
+ fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
+ return 1;
+ }else{
+ cfg->pub_mode = MSGMODE_NULL;
+ }
+ }else if(!strcmp(argv[i], "-V") || !strcmp(argv[i], "--protocol-version")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --protocol-version argument given but no version specified.\n\n");
+ return 1;
+ }else{
+ if(!strcmp(argv[i+1], "mqttv31")){
+ cfg->protocol_version = MQTT_PROTOCOL_V31;
+ }else if(!strcmp(argv[i+1], "mqttv311")){
+ cfg->protocol_version = MQTT_PROTOCOL_V311;
+ }else{
+ fprintf(stderr, "Error: Invalid protocol version argument given.\n\n");
+ return 1;
+ }
+ i++;
+ }
+#ifdef WITH_SOCKS
+ }else if(!strcmp(argv[i], "--proxy")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --proxy argument given but no proxy url specified.\n\n");
+ return 1;
+ }else{
+ if(mosquitto__parse_socks_url(cfg, argv[i+1])){
+ return 1;
+ }
+ i++;
+ }
+#ifdef WITH_TLS_PSK
+ }else if(!strcmp(argv[i], "--psk")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --psk argument given but no key specified.\n\n");
+ return 1;
+ }else{
+ cfg->psk = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--psk-identity")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --psk-identity argument given but no identity specified.\n\n");
+ return 1;
+ }else{
+ cfg->psk_identity = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-q") || !strcmp(argv[i], "--qos")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -q argument given but no QoS specified.\n\n");
+ return 1;
+ }else{
+ cfg->qos = atoi(argv[i+1]);
+ if(cfg->qos<0 || cfg->qos>2){
+ fprintf(stderr, "Error: Invalid QoS given: %d\n", cfg->qos);
+ return 1;
+ }
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--quiet")){
+ cfg->quiet = true;
+ }else if(!strcmp(argv[i], "-r") || !strcmp(argv[i], "--retain")){
+ if(pub_or_sub == CLIENT_SUB){
+ goto unknown_option;
+ }
+ cfg->retain = 1;
+ }else if(!strcmp(argv[i], "-s") || !strcmp(argv[i], "--stdin-file")){
+ if(pub_or_sub == CLIENT_SUB){
+ goto unknown_option;
+ }
+ if(cfg->pub_mode != MSGMODE_NONE){
+ fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
+ return 1;
+ }else{
+ cfg->pub_mode = MSGMODE_STDIN_FILE;
+ }
+#ifdef WITH_SRV
+ }else if(!strcmp(argv[i], "-S")){
+ cfg->use_srv = true;
+ }else if(!strcmp(argv[i], "-t") || !strcmp(argv[i], "--topic")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -t argument given but no topic specified.\n\n");
+ return 1;
+ }else{
+ if(pub_or_sub == CLIENT_PUB){
+ if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
+ fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", argv[i+1]);
+ return 1;
+ }
+ cfg->topic = strdup(argv[i+1]);
+ }else{
+ if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
+ fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]);
+ return 1;
+ }
+ cfg->topic_count++;
+ cfg->topics = realloc(cfg->topics, cfg->topic_count*sizeof(char *));
+ cfg->topics[cfg->topic_count-1] = strdup(argv[i+1]);
+ }
+ i++;
+ }
+ }else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ if(i==argc-1){
+ fprintf(stderr, "Error: -T argument given but no topic filter specified.\n\n");
+ return 1;
+ }else{
+ if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
+ fprintf(stderr, "Error: Invalid filter topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]);
+ return 1;
+ }
+ cfg->filter_out_count++;
+ cfg->filter_outs = realloc(cfg->filter_outs, cfg->filter_out_count*sizeof(char *));
+ cfg->filter_outs[cfg->filter_out_count-1] = strdup(argv[i+1]);
+ }
+ i++;
+#ifdef WITH_TLS
+ }else if(!strcmp(argv[i], "--tls-version")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --tls-version argument given but no version specified.\n\n");
+ return 1;
+ }else{
+ cfg->tls_version = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-u") || !strcmp(argv[i], "--username")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -u argument given but no username specified.\n\n");
+ return 1;
+ }else{
+ cfg->username = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-P") || !strcmp(argv[i], "--pw")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -P argument given but no password specified.\n\n");
+ return 1;
+ }else{
+ cfg->password = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--will-payload")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --will-payload argument given but no will payload specified.\n\n");
+ return 1;
+ }else{
+ cfg->will_payload = strdup(argv[i+1]);
+ cfg->will_payloadlen = strlen(cfg->will_payload);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--will-qos")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --will-qos argument given but no will QoS specified.\n\n");
+ return 1;
+ }else{
+ cfg->will_qos = atoi(argv[i+1]);
+ if(cfg->will_qos < 0 || cfg->will_qos > 2){
+ fprintf(stderr, "Error: Invalid will QoS %d.\n\n", cfg->will_qos);
+ return 1;
+ }
+ }
+ i++;
+ }else if(!strcmp(argv[i], "--will-retain")){
+ cfg->will_retain = true;
+ }else if(!strcmp(argv[i], "--will-topic")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --will-topic argument given but no will topic specified.\n\n");
+ return 1;
+ }else{
+ if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){
+ fprintf(stderr, "Error: Invalid will topic '%s', does it contain '+' or '#'?\n", argv[i+1]);
+ return 1;
+ }
+ cfg->will_topic = strdup(argv[i+1]);
+ }
+ i++;
+ }else if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "--disable-clean-session")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ cfg->clean_session = false;
+ }else if(!strcmp(argv[i], "-N")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ cfg->eol = false;
+ }else if(!strcmp(argv[i], "-R")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ cfg->no_retain = true;
+ }else if(!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")){
+ if(pub_or_sub == CLIENT_PUB){
+ goto unknown_option;
+ }
+ cfg->verbose = 1;
+ }else{
+ goto unknown_option;
+ }
+ }
+ fprintf(stderr, "Error: Unknown option '%s'.\n",argv[i]);
+ return 1;
+int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg)
+ int rc;
+ if(cfg->will_topic && mosquitto_will_set(mosq, cfg->will_topic,
+ cfg->will_payloadlen, cfg->will_payload, cfg->will_qos,
+ cfg->will_retain)){
+ if(!cfg->quiet) fprintf(stderr, "Error: Problem setting will.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+ if(cfg->username && mosquitto_username_pw_set(mosq, cfg->username, cfg->password)){
+ if(!cfg->quiet) fprintf(stderr, "Error: Problem setting username and password.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+#ifdef WITH_TLS
+ if((cfg->cafile || cfg->capath)
+ && mosquitto_tls_set(mosq, cfg->cafile, cfg->capath, cfg->certfile, cfg->keyfile, NULL)){
+ if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS options.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+ if(cfg->insecure && mosquitto_tls_insecure_set(mosq, true)){
+ if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS insecure option.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+# ifdef WITH_TLS_PSK
+ if(cfg->psk && mosquitto_tls_psk_set(mosq, cfg->psk, cfg->psk_identity, NULL)){
+ if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS-PSK options.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+# endif
+ if(cfg->tls_version && mosquitto_tls_opts_set(mosq, 1, cfg->tls_version, cfg->ciphers)){
+ if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS options.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+ mosquitto_max_inflight_messages_set(mosq, cfg->max_inflight);
+#ifdef WITH_SOCKS
+ if(cfg->socks5_host){
+ rc = mosquitto_socks5_set(mosq, cfg->socks5_host, cfg->socks5_port, cfg->socks5_username, cfg->socks5_password);
+ if(rc){
+ mosquitto_lib_cleanup();
+ return rc;
+ }
+ }
+ mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &(cfg->protocol_version));
+int client_id_generate(struct mosq_config *cfg, const char *id_base)
+ int len;
+ char hostname[256];
+ if(cfg->id_prefix){
+ cfg->id = malloc(strlen(cfg->id_prefix)+10);
+ if(!cfg->id){
+ if(!cfg->quiet) fprintf(stderr, "Error: Out of memory.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+ snprintf(cfg->id, strlen(cfg->id_prefix)+10, "%s%d", cfg->id_prefix, getpid());
+ }else if(!cfg->id){
+ hostname[0] = '\0';
+ gethostname(hostname, 256);
+ hostname[255] = '\0';
+ len = strlen(id_base) + strlen("/-") + 6 + strlen(hostname);
+ cfg->id = malloc(len);
+ if(!cfg->id){
+ if(!cfg->quiet) fprintf(stderr, "Error: Out of memory.\n");
+ mosquitto_lib_cleanup();
+ return 1;
+ }
+ snprintf(cfg->id, len, "%s/%d-%s", id_base, getpid(), hostname);
+ if(strlen(cfg->id) > MOSQ_MQTT_ID_MAX_LENGTH){
+ /* Enforce maximum client id length of 23 characters */
+ cfg->id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
+ }
+ }
+int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
+ char err[1024];
+ int rc;
+#ifdef WITH_SRV
+ if(cfg->use_srv){
+ rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);
+ }else{
+ rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
+ }
+ rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
+ if(rc){
+ if(!cfg->quiet){
+ if(rc == MOSQ_ERR_ERRNO){
+#ifndef WIN32
+ strerror_r(errno, err, 1024);
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);
+ fprintf(stderr, "Error: %s\n", err);
+ }else{
+ fprintf(stderr, "Unable to connect (%d).\n", rc);
+ }
+ }
+ mosquitto_lib_cleanup();
+ return rc;
+ }
+#ifdef WITH_SOCKS
+/* Convert %25 -> %, %3a, %3A -> :, %40 -> @ */
+static int mosquitto__urldecode(char *str)
+ int i, j;
+ int len;
+ if(!str) return 0;
+ if(!strchr(str, '%')) return 0;
+ len = strlen(str);
+ for(i=0; i<len; i++){
+ if(str[i] == '%'){
+ if(i+2 >= len){
+ return 1;
+ }
+ if(str[i+1] == '2' && str[i+2] == '5'){
+ str[i] = '%';
+ len -= 2;
+ for(j=i+1; j<len; j++){
+ str[j] = str[j+2];
+ }
+ str[j] = '\0';
+ }else if(str[i+1] == '3' && (str[i+2] == 'A' || str[i+2] == 'a')){
+ str[i] = ':';
+ len -= 2;
+ for(j=i+1; j<len; j++){
+ str[j] = str[j+2];
+ }
+ str[j] = '\0';
+ }else if(str[i+1] == '4' && str[i+2] == '0'){
+ str[i] = ':';
+ len -= 2;
+ for(j=i+1; j<len; j++){
+ str[j] = str[j+2];
+ }
+ str[j] = '\0';
+ }else{
+ return 1;
+ }
+ }
+ }
+ return 0;
+static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url)
+ char *str;
+ int i;
+ char *username = NULL, *password = NULL, *host = NULL, *port = NULL;
+ char *username_or_host = NULL;
+ int start;
+ int len;
+ bool have_auth = false;
+ int port_int;
+ if(!strncmp(url, "socks5h://", strlen("socks5h://"))){
+ str = url + strlen("socks5h://");
+ }else{
+ fprintf(stderr, "Error: Unsupported proxy protocol: %s\n", url);
+ return 1;
+ }
+ // socks5h://username:password@host:1883
+ // socks5h://username:password@host
+ // socks5h://username@host:1883
+ // socks5h://username@host
+ // socks5h://host:1883
+ // socks5h://host
+ start = 0;
+ for(i=0; i<strlen(str); i++){
+ if(str[i] == ':'){
+ if(i == start){
+ goto cleanup;
+ }
+ if(have_auth){
+ /* Have already seen a @ , so this must be of form
+ * socks5h://username[:password]@host:port */
+ if(host){
+ /* Already seen a host, must be malformed. */
+ goto cleanup;
+ }
+ len = i-start;
+ host = malloc(len + 1);
+ memcpy(host, &(str[start]), len);
+ host[len] = '\0';
+ start = i+1;
+ }else if(!username_or_host){
+ /* Haven't seen a @ before, so must be of form
+ * socks5h://host:port or
+ * socks5h://username:password@host[:port] */
+ len = i-start;
+ username_or_host = malloc(len + 1);
+ memcpy(username_or_host, &(str[start]), len);
+ username_or_host[len] = '\0';
+ start = i+1;
+ }
+ }else if(str[i] == '@'){
+ if(i == start){
+ goto cleanup;
+ }
+ have_auth = true;
+ if(username_or_host){
+ /* Must be of form socks5h://username:password@... */
+ username = username_or_host;
+ username_or_host = NULL;
+ len = i-start;
+ password = malloc(len + 1);
+ memcpy(password, &(str[start]), len);
+ password[len] = '\0';
+ start = i+1;
+ }else{
+ /* Haven't seen a : yet, so must be of form
+ * socks5h://username@... */
+ if(username){
+ /* Already got a username, must be malformed. */
+ goto cleanup;
+ }
+ len = i-start;
+ username = malloc(len + 1);
+ memcpy(username, &(str[start]), len);
+ username[len] = '\0';
+ start = i+1;
+ }
+ }
+ }
+ /* Deal with remainder */
+ if(i > start){
+ len = i-start;
+ if(host){
+ /* Have already seen a @ , so this must be of form
+ * socks5h://username[:password]@host:port */
+ port = malloc(len + 1);
+ memcpy(port, &(str[start]), len);
+ port[len] = '\0';
+ }else if(username_or_host){
+ /* Haven't seen a @ before, so must be of form
+ * socks5h://host:port */
+ host = username_or_host;
+ username_or_host = NULL;
+ port = malloc(len + 1);
+ memcpy(port, &(str[start]), len);
+ port[len] = '\0';
+ }else{
+ host = malloc(len + 1);
+ memcpy(host, &(str[start]), len);
+ host[len] = '\0';
+ }
+ }
+ if(!host){
+ fprintf(stderr, "Error: Invalid proxy.\n");
+ goto cleanup;
+ }
+ if(mosquitto__urldecode(username)){
+ goto cleanup;
+ }
+ if(mosquitto__urldecode(password)){
+ goto cleanup;
+ }
+ if(port){
+ port_int = atoi(port);
+ if(port_int < 1 || port_int > 65535){
+ fprintf(stderr, "Error: Invalid proxy port %d\n", port_int);
+ goto cleanup;
+ }
+ free(port);
+ }else{
+ port_int = 1080;
+ }
+ cfg->socks5_username = username;
+ cfg->socks5_password = password;
+ cfg->socks5_host = host;
+ cfg->socks5_port = port_int;
+ return 0;
+ if(username_or_host) free(username_or_host);
+ if(username) free(username);
+ if(password) free(password);
+ if(host) free(host);
+ if(port) free(port);
+ return 1;
diff --git a/client_shared.h b/client_shared.h
new file mode 100644
index 0000000..3f8b14b
--- /dev/null
+++ b/client_shared.h
@@ -0,0 +1,107 @@
+Copyright (c) 2014 Roger Light <roger@atchoo.org>
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution.
+The Eclipse Public License is available at
+ http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at
+ http://www.eclipse.org/org/documents/edl-v10.php.
+ Roger Light - initial implementation and documentation.
+#include <stdio.h>
+/* pub_client.c modes */
+#define MSGMODE_NONE 0
+#define MSGMODE_CMD 1
+#define MSGMODE_FILE 4
+#define MSGMODE_NULL 5
+#define CLIENT_PUB 1
+#define CLIENT_SUB 2
+struct mosq_config {
+ char *id;
+ char *id_prefix;
+ int protocol_version;
+ int keepalive;
+ char *host;
+ int port;
+ int qos;
+ bool retain;
+ int pub_mode; /* pub */
+ char *file_input; /* pub */
+ char *message; /* pub */
+ long msglen; /* pub */
+ char *topic; /* pub */
+ char *bind_address;
+#ifdef WITH_SRV
+ bool use_srv;
+ bool debug;
+ bool quiet;
+ unsigned int max_inflight;
+ char *username;
+ char *password;
+ char *will_topic;
+ char *will_payload;
+ long will_payloadlen;
+ int will_qos;
+ bool will_retain;
+#ifdef WITH_TLS
+ char *cafile;
+ char *capath;
+ char *certfile;
+ char *keyfile;
+ char *ciphers;
+ bool insecure;
+ char *tls_version;
+# ifdef WITH_TLS_PSK
+ char *psk;
+ char *psk_identity;
+# endif
+ bool clean_session; /* sub */
+ char **topics; /* sub */
+ int topic_count; /* sub */
+ bool no_retain; /* sub */
+ char **filter_outs; /* sub */
+ int filter_out_count; /* sub */
+ bool verbose; /* sub */
+ bool eol; /* sub */
+ int msg_count; /* sub */
+#ifdef WITH_SOCKS
+ char *socks5_host;
+ int socks5_port;
+ char *socks5_username;
+ char *socks5_password;
+ /* dirpub */
+ bool isfmask;
+ bool overwrite;
+ char *fmask;
+ char ffmask[1000]; /* limit 1000 bytes. */
+ char ftoken[1000]; /* limit 1000 bytes. */
+ char *idtext;
+ char *fmask_topic;
+int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]);
+void client_config_cleanup(struct mosq_config *cfg);
+int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg);
+int client_id_generate(struct mosq_config *cfg, const char *id_base);
+int client_connect(struct mosquitto *mosq, struct mosq_config *cfg);
diff --git a/sub_client.c b/sub_client.c
index 787ed14..2a44f3a 100644
--- a/sub_client.c
+++ b/sub_client.c
@@ -1,31 +1,18 @@
-Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
-Copyright (c) 2013-2014 V.Krishn <vkrishn@insteps.net>
-All rights reserved.
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-1. Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-2. Redistributions in binary form must reproduce the above copyright
- notice, this list of conditions and the following disclaimer in the
- documentation and/or other materials provided with the distribution.
-3. Neither the name of mosquitto nor the names of its
- contributors may be used to endorse or promote products derived from
- this software without specific prior written permission.
+Copyright (c) 2009-2014 Roger Light <roger@atchoo.org>
+Copyright (c) 2013-2015 V.Krishn <vkrishn@insteps.net>
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution.
+The Eclipse Public License is available at
+ http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at
+ http://www.eclipse.org/org/documents/edl-v10.php.
+ Roger Light - initial implementation and documentation.
#include <assert.h>
@@ -47,34 +34,10 @@ POSSIBILITY OF SUCH DAMAGE.
#include <time.h>
#include <mosquitto.h>
+#include "client_shared.h"
-/* This struct is used to pass data to callbacks.
- * An instance "ud" is created in main() and populated, then passed to
- * mosquitto_new(). */
-struct userdata {
- char **topics;
- int topic_count;
- int topic_qos;
- char **filter_outs;
- int filter_out_count;
- char *username;
- char *password;
- int verbose;
- bool quiet;
- bool no_retain;
- bool eol;
- bool isfmask;
- char *fmask;
- char ffmask[1000]; /* limit 1000 bytes. */
- char ftoken[1000]; /* limit 1000 bytes. */
- //char *fmask_resolve;
- char *idtext;
- //char *cwd;
- //char *path;
- char *fmask_topic;
- bool overwrite;
+bool process_messages = true;
+int msg_count = 0;
@(#)Purpose: Create all directories in path
@(#)Author: J Leffler
@@ -125,9 +88,9 @@ int mkpath(const char *path, mode_t mode)
/* ------------------------------------------------------------- */
- * Expand --fmask string options for output filename.
- * DateTime string expansion for --fmask
+/* Expand --fmask string options for output filename.
+ DateTime string expansion for --fmask
/* ------------------------------------------------------------- */
#define FMASK_EPOCH 0
#define FMASK_DATE 1
@@ -155,7 +118,7 @@ const char *datetime(int fmt)
switch(fmt) {
- n = snprintf(dt, size, "%02d", current);
+ n = snprintf(dt, size, "%02d", (int)current);
n = snprintf(dt, size, "%02d-%02d-%02d",
@@ -203,20 +166,19 @@ const char *datetime(int fmt)
/* ------------------------------------------------------------- */
- * Expand/resolve fmask token string.
+/* Expand/resolve fmask token string. */
/* ------------------------------------------------------------- */
void *_setfmask(char *token, void *obj)
- struct userdata *ud;
+ struct mosq_config *cfg;
- ud = (struct userdata *)obj;
+ cfg = (struct mosq_config *)obj;
char *str2, *subtoken;
char *saveptr2;
- char *to = ud->ftoken; /* limit 1000 bytes. */
+ char *to = cfg->ftoken; /* limit 1000 bytes. */
const char *dt;
for (str2 = token; ; str2 = NULL) {
@@ -246,27 +208,27 @@ void *_setfmask(char *token, void *obj)
} else if(!strcmp(subtoken, "sec")) {
dt = datetime(9);
} else if(!strcmp(subtoken, "topic")) {
- dt = ud->fmask_topic;
+ dt = cfg->fmask_topic;
} else if(!strcmp(subtoken, "topic1")) {
- dt = ud->topics[0];
+ dt = cfg->topics[0];
} else if(!strcmp(subtoken, "topic2")) {
- dt = ud->topics[1];
+ dt = cfg->topics[1];
} else if(!strcmp(subtoken, "topic3")) {
- dt = ud->topics[2];
+ dt = cfg->topics[2];
} else if(!strcmp(subtoken, "topic4")) {
- dt = ud->topics[3];
+ dt = cfg->topics[3];
} else if(!strcmp(subtoken, "topic5")) {
- dt = ud->topics[4];
+ dt = cfg->topics[4];
} else if(!strcmp(subtoken, "topic6")) {
- dt = ud->topics[5];
+ dt = cfg->topics[5];
} else if(!strcmp(subtoken, "topic7")) {
- dt = ud->topics[6];
+ dt = cfg->topics[6];
} else if(!strcmp(subtoken, "topic8")) {
- dt = ud->topics[7];
+ dt = cfg->topics[7];
} else if(!strcmp(subtoken, "topic9")) {
- dt = ud->topics[8];
+ dt = cfg->topics[8];
} else if(!strcmp(subtoken, "id")) {
- dt = ud->idtext;
+ dt = cfg->idtext;
} else {
dt = strdup (subtoken);
@@ -276,26 +238,25 @@ void *_setfmask(char *token, void *obj)
to[strlen(to)-1] = '\0';
+ return 0;
- * Expand --fmask string options for output filename.
+/* Expand --fmask string options for output filename. */
/* ------------------------------------------------------------- */
void *_fmask(char *fmask, void *obj)
- struct userdata *ud;
+ struct mosq_config *cfg;
- ud = (struct userdata *)obj;
+ cfg = (struct mosq_config *)obj;
- char *str1, *str2, *token, *subtoken;
- char *saveptr1, *saveptr2;
- int j;
+ char *str1, *token;
+ char *saveptr1;
- char *path, *prog;
+ char *path;
path = strdup (fmask);
- char *to = ud->ffmask; /* limit 1000 bytes. */
+ char *to = cfg->ffmask; /* limit 1000 bytes. */
to = stpcpy (to, "/");
for (str1 = path; ; str1 = NULL) {
@@ -304,18 +265,20 @@ void *_fmask(char *fmask, void *obj)
/* format type */
- _setfmask(token, ud);
- to = stpcpy (to, ud->ftoken);
+ _setfmask(token, cfg);
+ to = stpcpy (to, cfg->ftoken);
to = stpcpy (to, "/");
to[strlen(to)-1] = '\0';
+ return 0;
- * File open with given mode.
- * returns file descriptor (fd)
+ File open with given mode.
+ returns file descriptor (fd)
/* ------------------------------------------------------------- */
FILE *_mosquitto_fopen(const char *path, const char *mode)
@@ -335,38 +298,38 @@ FILE *_mosquitto_fopen(const char *path, const char *mode)
void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
- struct userdata *ud;
+ struct mosq_config *cfg;
int i;
bool res;
- ud = (struct userdata *)obj;
+ cfg = (struct mosq_config *)obj;
- if(message->retain && ud->no_retain) return;
- if(ud->filter_outs){
- for(i=0; i<ud->filter_out_count; i++){
- mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res);
+ if(message->retain && cfg->no_retain) return;
+ if(cfg->filter_outs){
+ for(i=0; i<cfg->filter_out_count; i++){
+ mosquitto_topic_matches_sub(cfg->filter_outs[i], message->topic, &res);
if(res) return;
- ud->fmask_topic = message->topic;
+ cfg->fmask_topic = message->topic;
FILE *fptr = NULL;
- _fmask(ud->fmask, ud);
+ _fmask(cfg->fmask, cfg);
char *path, *prog;
- path = strdup (ud->ffmask);
- prog = strdup (ud->ffmask);
+ path = strdup (cfg->ffmask);
+ prog = strdup (cfg->ffmask);
path = dirname (path);
prog = basename (prog);
- int status = mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+ mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
- if(ud->overwrite) {
- fptr = _mosquitto_fopen(ud->ffmask, "w");
+ if(cfg->overwrite) {
+ fptr = _mosquitto_fopen(cfg->ffmask, "w");
} else {
- fptr = _mosquitto_fopen(ud->ffmask, "a");
+ fptr = _mosquitto_fopen(cfg->ffmask, "a");
@@ -374,22 +337,22 @@ void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mo
// need to do normal stdout
//mosquitto_message_callback_set(mosq, "my_message_callback");
} else{
- if(ud->verbose){
+ if(cfg->verbose){
fprintf(fptr, "%s ", message->topic);
fwrite(message->payload, 1, message->payloadlen, fptr);
- if(ud->eol){
+ if(cfg->eol){
fprintf(fptr, "\n");
- if(ud->eol){
+ if(cfg->eol){
fprintf(fptr, "%s (null)\n", message->topic);
fwrite(message->payload, 1, message->payloadlen, fptr);
- if(ud->eol){
+ if(cfg->eol){
fprintf(fptr, "\n");
@@ -400,30 +363,32 @@ void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mo
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
- struct userdata *ud;
+ struct mosq_config *cfg;
int i;
bool res;
+ if(process_messages == false) return;
- ud = (struct userdata *)obj;
+ cfg = (struct mosq_config *)obj;
- if(message->retain && ud->no_retain) return;
- if(ud->filter_outs){
- for(i=0; i<ud->filter_out_count; i++){
- mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res);
+ if(message->retain && cfg->no_retain) return;
+ if(cfg->filter_outs){
+ for(i=0; i<cfg->filter_out_count; i++){
+ mosquitto_topic_matches_sub(cfg->filter_outs[i], message->topic, &res);
if(res) return;
- if(ud->verbose){
+ if(cfg->verbose){
printf("%s ", message->topic);
fwrite(message->payload, 1, message->payloadlen, stdout);
- if(ud->eol){
+ if(cfg->eol){
- if(ud->eol){
+ if(cfg->eol){
printf("%s (null)\n", message->topic);
@@ -431,28 +396,35 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit
fwrite(message->payload, 1, message->payloadlen, stdout);
- if(ud->eol){
+ if(cfg->eol){
+ if(cfg->msg_count>0){
+ msg_count++;
+ if(cfg->msg_count == msg_count){
+ process_messages = false;
+ mosquitto_disconnect(mosq);
+ }
+ }
void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
int i;
- struct userdata *ud;
+ struct mosq_config *cfg;
- ud = (struct userdata *)obj;
+ cfg = (struct mosq_config *)obj;
- for(i=0; i<ud->topic_count; i++){
- mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos);
+ for(i=0; i<cfg->topic_count; i++){
+ mosquitto_subscribe(mosq, NULL, cfg->topics[i], cfg->qos);
- if(result && !ud->quiet){
+ if(result && !cfg->quiet){
fprintf(stderr, "%s\n", mosquitto_connack_string(result));
@@ -461,16 +433,16 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
int i;
- struct userdata *ud;
+ struct mosq_config *cfg;
- ud = (struct userdata *)obj;
+ cfg = (struct mosq_config *)obj;
- if(!ud->quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
+ if(!cfg->quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
- if(!ud->quiet) printf(", %d", granted_qos[i]);
+ if(!cfg->quiet) printf(", %d", granted_qos[i]);
- if(!ud->quiet) printf("\n");
+ if(!cfg->quiet) printf("\n");
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
@@ -486,7 +458,7 @@ void print_usage(void)
printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n");
printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n");
- printf(" [-T filter_out]\n");
+ printf(" [-C msg_count] [-T filter_out]\n");
#ifdef WITH_SRV
printf(" [-A bind_address] [-S]\n");
@@ -504,10 +476,14 @@ void print_usage(void)
printf(" [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n");
+#ifdef WITH_SOCKS
+ printf(" [--proxy socks-url]\n");
printf(" mosquitto_sub --help\n\n");
printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n");
printf(" the client communicates over.\n");
printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n");
+ printf(" -C : disconnect and exit after receiving the 'msg_count' messages.\n");
printf(" -d : enable debug messages.\n");
printf(" -h : mqtt host to connect to. Defaults to localhost.\n");
printf(" -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.\n");
@@ -516,15 +492,18 @@ void print_usage(void)
printf(" -k : keep alive in seconds for this client. Defaults to 60.\n");
printf(" -N : do not add an end of line character when printing the payload.\n");
printf(" -p : network port to connect to. Defaults to 1883.\n");
+ printf(" -P : provide a password (requires MQTT 3.1 broker)\n");
printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n");
printf(" -R : do not print stale messages (those with retain set).\n");
#ifdef WITH_SRV
printf(" -S : use SRV lookups to determine which host to connect to.\n");
printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n");
+ printf(" -T : topic string to filter out of results. May be repeated.\n");
printf(" -u : provide a username (requires MQTT 3.1 broker)\n");
printf(" -v : print published messages verbosely.\n");
- printf(" -P : provide a password (requires MQTT 3.1 broker)\n");
+ printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
+ printf(" Can be mqttv31 or mqttv311. Defaults to mqttv31.\n");
printf(" --help : display this message.\n");
printf(" --quiet : don't print error messages.\n");
printf(" --fmask : path to message outfile\n");
@@ -557,474 +536,78 @@ void print_usage(void)
printf(" --psk-identity : client identity string for TLS-PSK mode.\n");
+#ifdef WITH_SOCKS
+ printf(" --proxy : SOCKS5 proxy URL of the form:\n");
+ printf(" socks5h://[username[:password]@]hostname[:port]\n");
+ printf(" Only \"none\" and \"username\" authentication is supported.\n");
printf("\nSee http://mosquitto.org/ for more information.\n\n");
int main(int argc, char *argv[])
- char *id = NULL;
- char *id_prefix = NULL;
- int i;
- char *host = "localhost";
- int port = 1883;
- char *bind_address = NULL;
- int keepalive = 60;
- bool clean_session = true;
- bool debug = false;
+ struct mosq_config cfg;
struct mosquitto *mosq = NULL;
int rc;
- char hostname[256];
- char err[1024];
- struct userdata ud;
- int len;
- char *will_payload = NULL;
- long will_payloadlen = 0;
- int will_qos = 0;
- bool will_retain = false;
- char *will_topic = NULL;
- bool insecure = false;
- char *cafile = NULL;
- char *capath = NULL;
- char *certfile = NULL;
- char *keyfile = NULL;
- char *tls_version = NULL;
- char *psk = NULL;
- char *psk_identity = NULL;
- char *ciphers = NULL;
- bool use_srv = false;
- memset(&ud, 0, sizeof(struct userdata));
- ud.eol = true;
- for(i=1; i<argc; i++){
- if(!strcmp(argv[i], "-p") || !strcmp(argv[i], "--port")){
- if(i==argc-1){
- fprintf(stderr, "Error: -p argument given but no port specified.\n\n");
- print_usage();
- return 1;
- }else{
- port = atoi(argv[i+1]);
- if(port<1 || port>65535){
- fprintf(stderr, "Error: Invalid port given: %d\n", port);
- print_usage();
- return 1;
- }
- }
- i++;
- }else if(!strcmp(argv[i], "-A")){
- if(i==argc-1){
- fprintf(stderr, "Error: -A argument given but no address specified.\n\n");
- print_usage();
- return 1;
- }else{
- bind_address = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "--disable-clean-session")){
- clean_session = false;
- }else if(!strcmp(argv[i], "--fmask")){
- if(i==argc-1){
- fprintf(stderr, "Error: --fmask argument given but no outfile specified.\n\n");
- print_usage();
- return 1;
- }else{
- ud.fmask = argv[i+1];
- ud.isfmask = true;
- }
- i++;
- }else if(!strcmp(argv[i], "--overwrite")){
- ud.overwrite = true;
- }else if(!strcmp(argv[i], "--cafile")){
- if(i==argc-1){
- fprintf(stderr, "Error: --cafile argument given but no file specified.\n\n");
- print_usage();
- return 1;
- }else{
- cafile = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "--capath")){
- if(i==argc-1){
- fprintf(stderr, "Error: --capath argument given but no directory specified.\n\n");
- print_usage();
- return 1;
- }else{
- capath = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "--cert")){
- if(i==argc-1){
- fprintf(stderr, "Error: --cert argument given but no file specified.\n\n");
- print_usage();
- return 1;
- }else{
- certfile = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "--ciphers")){
- if(i==argc-1){
- fprintf(stderr, "Error: --ciphers argument given but no ciphers specified.\n\n");
- print_usage();
- return 1;
- }else{
- ciphers = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){
- debug = true;
- }else if(!strcmp(argv[i], "--help")){
+ rc = client_config_load(&cfg, CLIENT_SUB, argc, argv);
+ if(rc){
+ client_config_cleanup(&cfg);
+ if(rc == 2){
+ /* --help */
- return 0;
- }else if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--host")){
- if(i==argc-1){
- fprintf(stderr, "Error: -h argument given but no host specified.\n\n");
- print_usage();
- return 1;
- }else{
- host = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "--insecure")){
- insecure = true;
- }else if(!strcmp(argv[i], "-i") || !strcmp(argv[i], "--id")){
- if(id_prefix){
- fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n");
- print_usage();
- return 1;
- }
- if(i==argc-1){
- fprintf(stderr, "Error: -i argument given but no id specified.\n\n");
- print_usage();
- return 1;
- }else{
- id = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-I") || !strcmp(argv[i], "--id-prefix")){
- if(id){
- fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n");
- print_usage();
- return 1;
- }
- if(i==argc-1){
- fprintf(stderr, "Error: -I argument given but no id prefix specified.\n\n");
- print_usage();
- return 1;
- }else{
- id_prefix = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-k") || !strcmp(argv[i], "--keepalive")){
- if(i==argc-1){
- fprintf(stderr, "Error: -k argument given but no keepalive specified.\n\n");
- print_usage();
- return 1;
- }else{
- keepalive = atoi(argv[i+1]);
- if(keepalive>65535){
- fprintf(stderr, "Error: Invalid keepalive given: %d\n", keepalive);
- print_usage();
- return 1;
- }
- }
- i++;
- }else if(!strcmp(argv[i], "--key")){
- if(i==argc-1){
- fprintf(stderr, "Error: --key argument given but no file specified.\n\n");
- print_usage();
- return 1;
- }else{
- keyfile = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-N")){
- ud.eol = false;
- }else if(!strcmp(argv[i], "--psk")){
- if(i==argc-1){
- fprintf(stderr, "Error: --psk argument given but no key specified.\n\n");
- print_usage();
- return 1;
- }else{
- psk = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "--psk-identity")){
- if(i==argc-1){
- fprintf(stderr, "Error: --psk-identity argument given but no identity specified.\n\n");
- print_usage();
- return 1;
- }else{
- psk_identity = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-q") || !strcmp(argv[i], "--qos")){
- if(i==argc-1){
- fprintf(stderr, "Error: -q argument given but no QoS specified.\n\n");
- print_usage();
- return 1;
- }else{
- ud.topic_qos = atoi(argv[i+1]);
- if(ud.topic_qos<0 || ud.topic_qos>2){
- fprintf(stderr, "Error: Invalid QoS given: %d\n", ud.topic_qos);
- print_usage();
- return 1;
- }
- }
- i++;
- }else if(!strcmp(argv[i], "--quiet")){
- ud.quiet = true;
- }else if(!strcmp(argv[i], "-R")){
- ud.no_retain = true;
-#ifdef WITH_SRV
- }else if(!strcmp(argv[i], "-S")){
- use_srv = true;
- }else if(!strcmp(argv[i], "-t") || !strcmp(argv[i], "--topic")){
- if(i==argc-1){
- fprintf(stderr, "Error: -t argument given but no topic specified.\n\n");
- print_usage();
- return 1;
- }else{
- ud.topic_count++;
- ud.topics = realloc(ud.topics, ud.topic_count*sizeof(char *));
- ud.topics[ud.topic_count-1] = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){
- if(i==argc-1){
- fprintf(stderr, "Error: -T argument given but no topic filter specified.\n\n");
- print_usage();
- return 1;
- }else{
- ud.filter_out_count++;
- ud.filter_outs = realloc(ud.filter_outs, ud.filter_out_count*sizeof(char *));
- ud.filter_outs[ud.filter_out_count-1] = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "--tls-version")){
- if(i==argc-1){
- fprintf(stderr, "Error: --tls-version argument given but no version specified.\n\n");
- print_usage();
- return 1;
- }else{
- tls_version = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-u") || !strcmp(argv[i], "--username")){
- if(i==argc-1){
- fprintf(stderr, "Error: -u argument given but no username specified.\n\n");
- print_usage();
- return 1;
- }else{
- ud.username = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")){
- ud.verbose = 1;
- }else if(!strcmp(argv[i], "-P") || !strcmp(argv[i], "--pw")){
- if(i==argc-1){
- fprintf(stderr, "Error: -P argument given but no password specified.\n\n");
- print_usage();
- return 1;
- }else{
- ud.password = argv[i+1];
- }
- i++;
- }else if(!strcmp(argv[i], "--will-payload")){
- if(i==argc-1){
- fprintf(stderr, "Error: --will-payload argument given but no will payload specified.\n\n");
- print_usage();
- return 1;
- }else{
- will_payload = argv[i+1];
- will_payloadlen = strlen(will_payload);
- }
- i++;
- }else if(!strcmp(argv[i], "--will-qos")){
- if(i==argc-1){
- fprintf(stderr, "Error: --will-qos argument given but no will QoS specified.\n\n");
- print_usage();
- return 1;
- }else{
- will_qos = atoi(argv[i+1]);
- if(will_qos < 0 || will_qos > 2){
- fprintf(stderr, "Error: Invalid will QoS %d.\n\n", will_qos);
- return 1;
- }
- }
- i++;
- }else if(!strcmp(argv[i], "--will-retain")){
- will_retain = true;
- }else if(!strcmp(argv[i], "--will-topic")){
- if(i==argc-1){
- fprintf(stderr, "Error: --will-topic argument given but no will topic specified.\n\n");
- print_usage();
- return 1;
- }else{
- will_topic = argv[i+1];
- }
- i++;
- fprintf(stderr, "Error: Unknown option '%s'.\n",argv[i]);
- print_usage();
- return 1;
+ fprintf(stderr, "\nUse 'mosquitto_sub --help' to see usage.\n");
- }
- if(clean_session == false && (id_prefix || !id)){
- if(!ud.quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n");
- return 1;
- }
- if(ud.topic_count == 0){
- fprintf(stderr, "Error: You must specify a topic to subscribe to.\n");
- print_usage();
- return 1;
- }
- if(will_payload && !will_topic){
- fprintf(stderr, "Error: Will payload given, but no will topic given.\n");
- print_usage();
- return 1;
- }
- if(will_retain && !will_topic){
- fprintf(stderr, "Error: Will retain given, but no will topic given.\n");
- print_usage();
- return 1;
- }
- if(ud.password && !ud.username){
- if(!ud.quiet) fprintf(stderr, "Warning: Not using password since username not set.\n");
- }
- if((certfile && !keyfile) || (keyfile && !certfile)){
- fprintf(stderr, "Error: Both certfile and keyfile must be provided if one of them is.\n");
- print_usage();
- return 1;
- }
- if((cafile || capath) && psk){
- if(!ud.quiet) fprintf(stderr, "Error: Only one of --psk or --cafile/--capath may be used at once.\n");
- return 1;
- }
- if(psk && !psk_identity){
- if(!ud.quiet) fprintf(stderr, "Error: --psk-identity required if --psk used.\n");
return 1;
- if(id_prefix){
- id = malloc(strlen(id_prefix)+10);
- if(!id){
- if(!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
- mosquitto_lib_cleanup();
- return 1;
- }
- snprintf(id, strlen(id_prefix)+10, "%s%d", id_prefix, getpid());
- }else if(!id){
- hostname[0] = '\0';
- gethostname(hostname, 256);
- hostname[255] = '\0';
- len = strlen("mosqsub/-") + 6 + strlen(hostname);
- id = malloc(len);
- if(!id){
- if(!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
- mosquitto_lib_cleanup();
- return 1;
- }
- snprintf(id, len, "mosqsub/%d-%s", getpid(), hostname);
- if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH){
- /* Enforce maximum client id length of 23 characters */
- }
+ if(client_id_generate(&cfg, "mosqsub")){
+ return 1;
- mosq = mosquitto_new(id, clean_session, &ud);
- ud.idtext = id;
+ mosq = mosquitto_new(cfg.id, cfg.clean_session, &cfg);
+ cfg.idtext = cfg.id;
case ENOMEM:
- if(!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
+ if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n");
case EINVAL:
- if(!ud.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
+ if(!cfg.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
return 1;
- if(debug){
- mosquitto_log_callback_set(mosq, my_log_callback);
- }
- if(will_topic && mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)){
- if(!ud.quiet) fprintf(stderr, "Error: Problem setting will.\n");
- mosquitto_lib_cleanup();
- return 1;
- }
- if(ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)){
- if(!ud.quiet) fprintf(stderr, "Error: Problem setting username and password.\n");
- mosquitto_lib_cleanup();
+ if(client_opts_set(mosq, &cfg)){
return 1;
- if((cafile || capath) && mosquitto_tls_set(mosq, cafile, capath, certfile, keyfile, NULL)){
- if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n");
- mosquitto_lib_cleanup();
- return 1;
- }
- if(insecure && mosquitto_tls_insecure_set(mosq, true)){
- if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS insecure option.\n");
- mosquitto_lib_cleanup();
- return 1;
- }
- if(psk && mosquitto_tls_psk_set(mosq, psk, psk_identity, NULL)){
- if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS-PSK options.\n");
- mosquitto_lib_cleanup();
- return 1;
- }
- if(tls_version && mosquitto_tls_opts_set(mosq, 1, tls_version, ciphers)){
- if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n");
- mosquitto_lib_cleanup();
- return 1;
+ if(cfg.debug){
+ mosquitto_log_callback_set(mosq, my_log_callback);
+ mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
- if(ud.isfmask) {
+ if(cfg.isfmask) {
mosquitto_message_callback_set(mosq, my_message_file_callback);
} else {
mosquitto_message_callback_set(mosq, my_message_callback);
- if(debug){
- mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
- }
- if(use_srv){
- rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address);
- }else{
- rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
- }
- if(rc){
- if(!ud.quiet){
- if(rc == MOSQ_ERR_ERRNO){
-#ifndef WIN32
- strerror_r(errno, err, 1024);
- FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);
- fprintf(stderr, "Error: %s\n", err);
- }else{
- fprintf(stderr, "Unable to connect (%d).\n", rc);
- }
- }
- mosquitto_lib_cleanup();
- return rc;
- }
+ rc = client_connect(mosq, &cfg);
+ if(rc) return rc;
rc = mosquitto_loop_forever(mosq, -1, 1);
+ if(cfg.msg_count>0 && rc == MOSQ_ERR_NO_CONN){
+ rc = 0;
+ }
fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));