diff options
author | V.Krishn <vkrishn4@gmail.com> | 2015-03-12 20:39:32 +0530 |
---|---|---|
committer | V.Krishn <vkrishn4@gmail.com> | 2015-03-12 20:39:32 +0530 |
commit | 2e75f767d64a72b00db5f1080e520f2b0737007e (patch) | |
tree | aadb46c041ed44fdef91fd0bd2405aac65b23e0c | |
parent | ed3d771aa17bc9f1f4a861ca4e42b8478db373a2 (diff) | |
download | mqtt-dirpub-2e75f767d64a72b00db5f1080e520f2b0737007e.tar.bz2 |
Update from mosquitto v1.4.0
Upstream files refactored.
Not fully tested, but kinda works for now.
On heavy traffic it may segfault.
-rw-r--r-- | client_shared.c | 976 | ||||
-rw-r--r-- | client_shared.h | 107 | ||||
-rw-r--r-- | sub_client.c | 689 |
3 files changed, 1219 insertions, 553 deletions
diff --git a/client_shared.c b/client_shared.c new file mode 100644 index 0000000..945b2fb --- /dev/null +++ b/client_shared.c @@ -0,0 +1,976 @@ +/* +Copyright (c) 2014 Roger Light <roger@atchoo.org> +Copyright (c) 2015 V.Krishn <vkrishn@insteps.net> + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. + V Krishn - implement dirpub. +*/ + + +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#ifndef WIN32 +#include <unistd.h> +#else +#include <process.h> +#include <winsock2.h> +#define snprintf sprintf_s +#endif + +#include <mosquitto.h> +#include "client_shared.h" + +static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url); +static int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]); + +void init_config(struct mosq_config *cfg) +{ + memset(cfg, 0, sizeof(*cfg)); + cfg->port = 1883; + cfg->max_inflight = 20; + cfg->keepalive = 60; + cfg->clean_session = true; + cfg->eol = true; + cfg->protocol_version = MQTT_PROTOCOL_V31; + + cfg->isfmask = false; + cfg->overwrite = false; + +} + +void client_config_cleanup(struct mosq_config *cfg) +{ + int i; + if(cfg->id) free(cfg->id); + if(cfg->id_prefix) free(cfg->id_prefix); + if(cfg->host) free(cfg->host); + if(cfg->file_input) free(cfg->file_input); + if(cfg->message) free(cfg->message); + if(cfg->topic) free(cfg->topic); + if(cfg->bind_address) free(cfg->bind_address); + if(cfg->username) free(cfg->username); + if(cfg->password) free(cfg->password); + if(cfg->will_topic) free(cfg->will_topic); + if(cfg->will_payload) free(cfg->will_payload); +#ifdef WITH_TLS + if(cfg->cafile) free(cfg->cafile); + if(cfg->capath) free(cfg->capath); + if(cfg->certfile) free(cfg->certfile); + if(cfg->keyfile) free(cfg->keyfile); + if(cfg->ciphers) free(cfg->ciphers); + if(cfg->tls_version) free(cfg->tls_version); +# ifdef WITH_TLS_PSK + if(cfg->psk) free(cfg->psk); + if(cfg->psk_identity) free(cfg->psk_identity); +# endif +#endif + if(cfg->topics){ + for(i=0; i<cfg->topic_count; i++){ + free(cfg->topics[i]); + } + free(cfg->topics); + } + if(cfg->filter_outs){ + for(i=0; i<cfg->filter_out_count; i++){ + free(cfg->filter_outs[i]); + } + free(cfg->filter_outs); + } +#ifdef WITH_SOCKS + if(cfg->socks5_host) free(cfg->socks5_host); + if(cfg->socks5_username) free(cfg->socks5_username); + if(cfg->socks5_password) free(cfg->socks5_password); +#endif +} + +int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]) +{ + int rc; + FILE *fptr; + char line[1024]; + int count; + char *loc = NULL; + int len; + char *args[3]; + +#ifndef WIN32 + char *env; +#else + char env[1024]; +#endif + args[0] = NULL; + + init_config(cfg); + + /* Default config file */ +#ifndef WIN32 + env = getenv("XDG_CONFIG_HOME"); + if(env){ + len = strlen(env) + strlen("/mosquitto_pub") + 1; + loc = malloc(len); + if(pub_or_sub == CLIENT_PUB){ + snprintf(loc, len, "%s/mosquitto_pub", env); + }else{ + snprintf(loc, len, "%s/mosquitto_sub", env); + } + loc[len-1] = '\0'; + }else{ + env = getenv("HOME"); + if(env){ + len = strlen(env) + strlen("/.config/mosquitto_pub") + 1; + loc = malloc(len); + if(pub_or_sub == CLIENT_PUB){ + snprintf(loc, len, "%s/.config/mosquitto_pub", env); + }else{ + snprintf(loc, len, "%s/.config/mosquitto_sub", env); + } + loc[len-1] = '\0'; + }else{ + fprintf(stderr, "Warning: Unable to locate configuration directory, default config not loaded.\n"); + } + } + +#else + rc = GetEnvironmentVariable("USERPROFILE", env, 1024); + if(rc > 0 && rc < 1024){ + len = strlen(env) + strlen("\\mosquitto_pub.conf") + 1; + loc = malloc(len); + if(pub_or_sub == CLIENT_PUB){ + snprintf(loc, len, "%s\\mosquitto_pub.conf", env); + }else{ + snprintf(loc, len, "%s\\mosquitto_sub.conf", env); + } + loc[len-1] = '\0'; + }else{ + fprintf(stderr, "Warning: Unable to locate configuration directory, default config not loaded.\n"); + } +#endif + + if(loc){ + fptr = fopen(loc, "rt"); + if(fptr){ + while(fgets(line, 1024, fptr)){ + if(line[0] == '#') continue; /* Comments */ + + while(line[strlen(line)-1] == 10 || line[strlen(line)-1] == 13){ + line[strlen(line)-1] = 0; + } + /* All offset by one "args" here, because real argc/argv has + * program name as the first entry. */ + args[1] = strtok(line, " "); + if(args[1]){ + args[2] = strtok(NULL, " "); + if(args[2]){ + count = 3; + }else{ + count = 2; + } + rc = client_config_line_proc(cfg, pub_or_sub, count, args); + if(rc){ + fclose(fptr); + free(loc); + return rc; + } + } + } + fclose(fptr); + } + free(loc); + } + + /* Deal with real argc/argv */ + rc = client_config_line_proc(cfg, pub_or_sub, argc, argv); + if(rc) return rc; + + if(cfg->will_payload && !cfg->will_topic){ + fprintf(stderr, "Error: Will payload given, but no will topic given.\n"); + return 1; + } + if(cfg->will_retain && !cfg->will_topic){ + fprintf(stderr, "Error: Will retain given, but no will topic given.\n"); + return 1; + } + if(cfg->password && !cfg->username){ + if(!cfg->quiet) fprintf(stderr, "Warning: Not using password since username not set.\n"); + } +#ifdef WITH_TLS + if((cfg->certfile && !cfg->keyfile) || (cfg->keyfile && !cfg->certfile)){ + fprintf(stderr, "Error: Both certfile and keyfile must be provided if one of them is.\n"); + return 1; + } +#endif +#ifdef WITH_TLS_PSK + if((cfg->cafile || cfg->capath) && cfg->psk){ + if(!cfg->quiet) fprintf(stderr, "Error: Only one of --psk or --cafile/--capath may be used at once.\n"); + return 1; + } + if(cfg->psk && !cfg->psk_identity){ + if(!cfg->quiet) fprintf(stderr, "Error: --psk-identity required if --psk used.\n"); + return 1; + } +#endif + + if(pub_or_sub == CLIENT_SUB){ + if(cfg->clean_session == false && (cfg->id_prefix || !cfg->id)){ + if(!cfg->quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n"); + return 1; + } + if(cfg->topic_count == 0){ + if(!cfg->quiet) fprintf(stderr, "Error: You must specify a topic to subscribe to.\n"); + return 1; + } + } + + if(!cfg->host){ + cfg->host = "localhost"; + } + return MOSQ_ERR_SUCCESS; +} + +/* Process a tokenised single line from a file or set of real argc/argv */ +int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]) +{ + int i; + + for(i=1; i<argc; i++){ + if(!strcmp(argv[i], "-p") || !strcmp(argv[i], "--port")){ + if(i==argc-1){ + fprintf(stderr, "Error: -p argument given but no port specified.\n\n"); + return 1; + }else{ + cfg->port = atoi(argv[i+1]); + if(cfg->port<1 || cfg->port>65535){ + fprintf(stderr, "Error: Invalid port given: %d\n", cfg->port); + return 1; + } + } + i++; + }else if(!strcmp(argv[i], "-A")){ + if(i==argc-1){ + fprintf(stderr, "Error: -A argument given but no address specified.\n\n"); + return 1; + }else{ + cfg->bind_address = strdup(argv[i+1]); + } + i++; + + }else if(!strcmp(argv[i], "--fmask")){ + if(i==argc-1){ + fprintf(stderr, "Error: --fmask argument given but no outfile specified.\n\n"); + return 1; + }else{ + cfg->fmask = argv[i+1]; + cfg->isfmask = true; + } + i++; + }else if(!strcmp(argv[i], "--overwrite")){ + cfg->overwrite = true; + +#ifdef WITH_TLS + }else if(!strcmp(argv[i], "--cafile")){ + if(i==argc-1){ + fprintf(stderr, "Error: --cafile argument given but no file specified.\n\n"); + return 1; + }else{ + cfg->cafile = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "--capath")){ + if(i==argc-1){ + fprintf(stderr, "Error: --capath argument given but no directory specified.\n\n"); + return 1; + }else{ + cfg->capath = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "--cert")){ + if(i==argc-1){ + fprintf(stderr, "Error: --cert argument given but no file specified.\n\n"); + return 1; + }else{ + cfg->certfile = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "--ciphers")){ + if(i==argc-1){ + fprintf(stderr, "Error: --ciphers argument given but no ciphers specified.\n\n"); + return 1; + }else{ + cfg->ciphers = strdup(argv[i+1]); + } + i++; +#endif + }else if(!strcmp(argv[i], "-C")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + }else{ + if(i==argc-1){ + fprintf(stderr, "Error: -C argument given but no count specified.\n\n"); + return 1; + }else{ + cfg->msg_count = atoi(argv[i+1]); + if(cfg->msg_count < 1){ + fprintf(stderr, "Error: Invalid message count \"%d\".\n\n", cfg->msg_count); + return 1; + } + } + i++; + } + }else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){ + cfg->debug = true; + }else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){ + if(pub_or_sub == CLIENT_SUB){ + goto unknown_option; + } + if(cfg->pub_mode != MSGMODE_NONE){ + fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n"); + return 1; + }else if(i==argc-1){ + fprintf(stderr, "Error: -f argument given but no file specified.\n\n"); + return 1; + }else{ + cfg->pub_mode = MSGMODE_FILE; + cfg->file_input = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "--help")){ + return 2; + }else if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--host")){ + if(i==argc-1){ + fprintf(stderr, "Error: -h argument given but no host specified.\n\n"); + return 1; + }else{ + cfg->host = strdup(argv[i+1]); + } + i++; +#ifdef WITH_TLS + }else if(!strcmp(argv[i], "--insecure")){ + cfg->insecure = true; +#endif + }else if(!strcmp(argv[i], "-i") || !strcmp(argv[i], "--id")){ + if(cfg->id_prefix){ + fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n"); + return 1; + } + if(i==argc-1){ + fprintf(stderr, "Error: -i argument given but no id specified.\n\n"); + return 1; + }else{ + cfg->id = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "-I") || !strcmp(argv[i], "--id-prefix")){ + if(cfg->id){ + fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n"); + return 1; + } + if(i==argc-1){ + fprintf(stderr, "Error: -I argument given but no id prefix specified.\n\n"); + return 1; + }else{ + cfg->id_prefix = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "-k") || !strcmp(argv[i], "--keepalive")){ + if(i==argc-1){ + fprintf(stderr, "Error: -k argument given but no keepalive specified.\n\n"); + return 1; + }else{ + cfg->keepalive = atoi(argv[i+1]); + if(cfg->keepalive>65535){ + fprintf(stderr, "Error: Invalid keepalive given: %d\n", cfg->keepalive); + return 1; + } + } + i++; +#ifdef WITH_TLS + }else if(!strcmp(argv[i], "--key")){ + if(i==argc-1){ + fprintf(stderr, "Error: --key argument given but no file specified.\n\n"); + return 1; + }else{ + cfg->keyfile = strdup(argv[i+1]); + } + i++; +#endif + }else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--stdin-line")){ + if(pub_or_sub == CLIENT_SUB){ + goto unknown_option; + } + if(cfg->pub_mode != MSGMODE_NONE){ + fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n"); + return 1; + }else{ + cfg->pub_mode = MSGMODE_STDIN_LINE; + } + }else if(!strcmp(argv[i], "-m") || !strcmp(argv[i], "--message")){ + if(pub_or_sub == CLIENT_SUB){ + goto unknown_option; + } + if(cfg->pub_mode != MSGMODE_NONE){ + fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n"); + return 1; + }else if(i==argc-1){ + fprintf(stderr, "Error: -m argument given but no message specified.\n\n"); + return 1; + }else{ + cfg->message = strdup(argv[i+1]); + cfg->msglen = strlen(cfg->message); + cfg->pub_mode = MSGMODE_CMD; + } + i++; + }else if(!strcmp(argv[i], "-M")){ + if(i==argc-1){ + fprintf(stderr, "Error: -M argument given but max_inflight not specified.\n\n"); + return 1; + }else{ + cfg->max_inflight = atoi(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "-n") || !strcmp(argv[i], "--null-message")){ + if(pub_or_sub == CLIENT_SUB){ + goto unknown_option; + } + if(cfg->pub_mode != MSGMODE_NONE){ + fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n"); + return 1; + }else{ + cfg->pub_mode = MSGMODE_NULL; + } + }else if(!strcmp(argv[i], "-V") || !strcmp(argv[i], "--protocol-version")){ + if(i==argc-1){ + fprintf(stderr, "Error: --protocol-version argument given but no version specified.\n\n"); + return 1; + }else{ + if(!strcmp(argv[i+1], "mqttv31")){ + cfg->protocol_version = MQTT_PROTOCOL_V31; + }else if(!strcmp(argv[i+1], "mqttv311")){ + cfg->protocol_version = MQTT_PROTOCOL_V311; + }else{ + fprintf(stderr, "Error: Invalid protocol version argument given.\n\n"); + return 1; + } + i++; + } +#ifdef WITH_SOCKS + }else if(!strcmp(argv[i], "--proxy")){ + if(i==argc-1){ + fprintf(stderr, "Error: --proxy argument given but no proxy url specified.\n\n"); + return 1; + }else{ + if(mosquitto__parse_socks_url(cfg, argv[i+1])){ + return 1; + } + i++; + } +#endif +#ifdef WITH_TLS_PSK + }else if(!strcmp(argv[i], "--psk")){ + if(i==argc-1){ + fprintf(stderr, "Error: --psk argument given but no key specified.\n\n"); + return 1; + }else{ + cfg->psk = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "--psk-identity")){ + if(i==argc-1){ + fprintf(stderr, "Error: --psk-identity argument given but no identity specified.\n\n"); + return 1; + }else{ + cfg->psk_identity = strdup(argv[i+1]); + } + i++; +#endif + }else if(!strcmp(argv[i], "-q") || !strcmp(argv[i], "--qos")){ + if(i==argc-1){ + fprintf(stderr, "Error: -q argument given but no QoS specified.\n\n"); + return 1; + }else{ + cfg->qos = atoi(argv[i+1]); + if(cfg->qos<0 || cfg->qos>2){ + fprintf(stderr, "Error: Invalid QoS given: %d\n", cfg->qos); + return 1; + } + } + i++; + }else if(!strcmp(argv[i], "--quiet")){ + cfg->quiet = true; + }else if(!strcmp(argv[i], "-r") || !strcmp(argv[i], "--retain")){ + if(pub_or_sub == CLIENT_SUB){ + goto unknown_option; + } + cfg->retain = 1; + }else if(!strcmp(argv[i], "-s") || !strcmp(argv[i], "--stdin-file")){ + if(pub_or_sub == CLIENT_SUB){ + goto unknown_option; + } + if(cfg->pub_mode != MSGMODE_NONE){ + fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n"); + return 1; + }else{ + cfg->pub_mode = MSGMODE_STDIN_FILE; + } +#ifdef WITH_SRV + }else if(!strcmp(argv[i], "-S")){ + cfg->use_srv = true; +#endif + }else if(!strcmp(argv[i], "-t") || !strcmp(argv[i], "--topic")){ + if(i==argc-1){ + fprintf(stderr, "Error: -t argument given but no topic specified.\n\n"); + return 1; + }else{ + if(pub_or_sub == CLIENT_PUB){ + if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid publish topic '%s', does it contain '+' or '#'?\n", argv[i+1]); + return 1; + } + cfg->topic = strdup(argv[i+1]); + }else{ + if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid subscription topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]); + return 1; + } + cfg->topic_count++; + cfg->topics = realloc(cfg->topics, cfg->topic_count*sizeof(char *)); + cfg->topics[cfg->topic_count-1] = strdup(argv[i+1]); + } + i++; + } + }else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + if(i==argc-1){ + fprintf(stderr, "Error: -T argument given but no topic filter specified.\n\n"); + return 1; + }else{ + if(mosquitto_sub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid filter topic '%s', are all '+' and '#' wildcards correct?\n", argv[i+1]); + return 1; + } + cfg->filter_out_count++; + cfg->filter_outs = realloc(cfg->filter_outs, cfg->filter_out_count*sizeof(char *)); + cfg->filter_outs[cfg->filter_out_count-1] = strdup(argv[i+1]); + } + i++; +#ifdef WITH_TLS + }else if(!strcmp(argv[i], "--tls-version")){ + if(i==argc-1){ + fprintf(stderr, "Error: --tls-version argument given but no version specified.\n\n"); + return 1; + }else{ + cfg->tls_version = strdup(argv[i+1]); + } + i++; +#endif + }else if(!strcmp(argv[i], "-u") || !strcmp(argv[i], "--username")){ + if(i==argc-1){ + fprintf(stderr, "Error: -u argument given but no username specified.\n\n"); + return 1; + }else{ + cfg->username = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "-P") || !strcmp(argv[i], "--pw")){ + if(i==argc-1){ + fprintf(stderr, "Error: -P argument given but no password specified.\n\n"); + return 1; + }else{ + cfg->password = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "--will-payload")){ + if(i==argc-1){ + fprintf(stderr, "Error: --will-payload argument given but no will payload specified.\n\n"); + return 1; + }else{ + cfg->will_payload = strdup(argv[i+1]); + cfg->will_payloadlen = strlen(cfg->will_payload); + } + i++; + }else if(!strcmp(argv[i], "--will-qos")){ + if(i==argc-1){ + fprintf(stderr, "Error: --will-qos argument given but no will QoS specified.\n\n"); + return 1; + }else{ + cfg->will_qos = atoi(argv[i+1]); + if(cfg->will_qos < 0 || cfg->will_qos > 2){ + fprintf(stderr, "Error: Invalid will QoS %d.\n\n", cfg->will_qos); + return 1; + } + } + i++; + }else if(!strcmp(argv[i], "--will-retain")){ + cfg->will_retain = true; + }else if(!strcmp(argv[i], "--will-topic")){ + if(i==argc-1){ + fprintf(stderr, "Error: --will-topic argument given but no will topic specified.\n\n"); + return 1; + }else{ + if(mosquitto_pub_topic_check(argv[i+1]) == MOSQ_ERR_INVAL){ + fprintf(stderr, "Error: Invalid will topic '%s', does it contain '+' or '#'?\n", argv[i+1]); + return 1; + } + cfg->will_topic = strdup(argv[i+1]); + } + i++; + }else if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "--disable-clean-session")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->clean_session = false; + }else if(!strcmp(argv[i], "-N")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->eol = false; + }else if(!strcmp(argv[i], "-R")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->no_retain = true; + }else if(!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->verbose = 1; + }else{ + goto unknown_option; + } + } + + return MOSQ_ERR_SUCCESS; + +unknown_option: + fprintf(stderr, "Error: Unknown option '%s'.\n",argv[i]); + return 1; +} + +int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg) +{ + int rc; + + if(cfg->will_topic && mosquitto_will_set(mosq, cfg->will_topic, + cfg->will_payloadlen, cfg->will_payload, cfg->will_qos, + cfg->will_retain)){ + + if(!cfg->quiet) fprintf(stderr, "Error: Problem setting will.\n"); + mosquitto_lib_cleanup(); + return 1; + } + if(cfg->username && mosquitto_username_pw_set(mosq, cfg->username, cfg->password)){ + if(!cfg->quiet) fprintf(stderr, "Error: Problem setting username and password.\n"); + mosquitto_lib_cleanup(); + return 1; + } +#ifdef WITH_TLS + if((cfg->cafile || cfg->capath) + && mosquitto_tls_set(mosq, cfg->cafile, cfg->capath, cfg->certfile, cfg->keyfile, NULL)){ + + if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS options.\n"); + mosquitto_lib_cleanup(); + return 1; + } + if(cfg->insecure && mosquitto_tls_insecure_set(mosq, true)){ + if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS insecure option.\n"); + mosquitto_lib_cleanup(); + return 1; + } +# ifdef WITH_TLS_PSK + if(cfg->psk && mosquitto_tls_psk_set(mosq, cfg->psk, cfg->psk_identity, NULL)){ + if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS-PSK options.\n"); + mosquitto_lib_cleanup(); + return 1; + } +# endif + if(cfg->tls_version && mosquitto_tls_opts_set(mosq, 1, cfg->tls_version, cfg->ciphers)){ + if(!cfg->quiet) fprintf(stderr, "Error: Problem setting TLS options.\n"); + mosquitto_lib_cleanup(); + return 1; + } +#endif + mosquitto_max_inflight_messages_set(mosq, cfg->max_inflight); +#ifdef WITH_SOCKS + if(cfg->socks5_host){ + rc = mosquitto_socks5_set(mosq, cfg->socks5_host, cfg->socks5_port, cfg->socks5_username, cfg->socks5_password); + if(rc){ + mosquitto_lib_cleanup(); + return rc; + } + } +#endif + mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &(cfg->protocol_version)); + return MOSQ_ERR_SUCCESS; +} + +int client_id_generate(struct mosq_config *cfg, const char *id_base) +{ + int len; + char hostname[256]; + + if(cfg->id_prefix){ + cfg->id = malloc(strlen(cfg->id_prefix)+10); + if(!cfg->id){ + if(!cfg->quiet) fprintf(stderr, "Error: Out of memory.\n"); + mosquitto_lib_cleanup(); + return 1; + } + snprintf(cfg->id, strlen(cfg->id_prefix)+10, "%s%d", cfg->id_prefix, getpid()); + }else if(!cfg->id){ + hostname[0] = '\0'; + gethostname(hostname, 256); + hostname[255] = '\0'; + len = strlen(id_base) + strlen("/-") + 6 + strlen(hostname); + cfg->id = malloc(len); + if(!cfg->id){ + if(!cfg->quiet) fprintf(stderr, "Error: Out of memory.\n"); + mosquitto_lib_cleanup(); + return 1; + } + snprintf(cfg->id, len, "%s/%d-%s", id_base, getpid(), hostname); + if(strlen(cfg->id) > MOSQ_MQTT_ID_MAX_LENGTH){ + /* Enforce maximum client id length of 23 characters */ + cfg->id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; + } + } + return MOSQ_ERR_SUCCESS; +} + +int client_connect(struct mosquitto *mosq, struct mosq_config *cfg) +{ + char err[1024]; + int rc; + +#ifdef WITH_SRV + if(cfg->use_srv){ + rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address); + }else{ + rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address); + } +#else + rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address); +#endif + if(rc){ + if(!cfg->quiet){ + if(rc == MOSQ_ERR_ERRNO){ +#ifndef WIN32 + strerror_r(errno, err, 1024); +#else + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL); +#endif + fprintf(stderr, "Error: %s\n", err); + }else{ + fprintf(stderr, "Unable to connect (%d).\n", rc); + } + } + mosquitto_lib_cleanup(); + return rc; + } + return MOSQ_ERR_SUCCESS; +} + +#ifdef WITH_SOCKS +/* Convert %25 -> %, %3a, %3A -> :, %40 -> @ */ +static int mosquitto__urldecode(char *str) +{ + int i, j; + int len; + if(!str) return 0; + + if(!strchr(str, '%')) return 0; + + len = strlen(str); + for(i=0; i<len; i++){ + if(str[i] == '%'){ + if(i+2 >= len){ + return 1; + } + if(str[i+1] == '2' && str[i+2] == '5'){ + str[i] = '%'; + len -= 2; + for(j=i+1; j<len; j++){ + str[j] = str[j+2]; + } + str[j] = '\0'; + }else if(str[i+1] == '3' && (str[i+2] == 'A' || str[i+2] == 'a')){ + str[i] = ':'; + len -= 2; + for(j=i+1; j<len; j++){ + str[j] = str[j+2]; + } + str[j] = '\0'; + }else if(str[i+1] == '4' && str[i+2] == '0'){ + str[i] = ':'; + len -= 2; + for(j=i+1; j<len; j++){ + str[j] = str[j+2]; + } + str[j] = '\0'; + }else{ + return 1; + } + } + } + return 0; +} + +static int mosquitto__parse_socks_url(struct mosq_config *cfg, char *url) +{ + char *str; + int i; + char *username = NULL, *password = NULL, *host = NULL, *port = NULL; + char *username_or_host = NULL; + int start; + int len; + bool have_auth = false; + int port_int; + + if(!strncmp(url, "socks5h://", strlen("socks5h://"))){ + str = url + strlen("socks5h://"); + }else{ + fprintf(stderr, "Error: Unsupported proxy protocol: %s\n", url); + return 1; + } + + // socks5h://username:password@host:1883 + // socks5h://username:password@host + // socks5h://username@host:1883 + // socks5h://username@host + // socks5h://host:1883 + // socks5h://host + + start = 0; + for(i=0; i<strlen(str); i++){ + if(str[i] == ':'){ + if(i == start){ + goto cleanup; + } + if(have_auth){ + /* Have already seen a @ , so this must be of form + * socks5h://username[:password]@host:port */ + if(host){ + /* Already seen a host, must be malformed. */ + goto cleanup; + } + len = i-start; + host = malloc(len + 1); + memcpy(host, &(str[start]), len); + host[len] = '\0'; + start = i+1; + }else if(!username_or_host){ + /* Haven't seen a @ before, so must be of form + * socks5h://host:port or + * socks5h://username:password@host[:port] */ + len = i-start; + username_or_host = malloc(len + 1); + memcpy(username_or_host, &(str[start]), len); + username_or_host[len] = '\0'; + start = i+1; + } + }else if(str[i] == '@'){ + if(i == start){ + goto cleanup; + } + have_auth = true; + if(username_or_host){ + /* Must be of form socks5h://username:password@... */ + username = username_or_host; + username_or_host = NULL; + + len = i-start; + password = malloc(len + 1); + memcpy(password, &(str[start]), len); + password[len] = '\0'; + start = i+1; + }else{ + /* Haven't seen a : yet, so must be of form + * socks5h://username@... */ + if(username){ + /* Already got a username, must be malformed. */ + goto cleanup; + } + len = i-start; + username = malloc(len + 1); + memcpy(username, &(str[start]), len); + username[len] = '\0'; + start = i+1; + } + } + } + + /* Deal with remainder */ + if(i > start){ + len = i-start; + if(host){ + /* Have already seen a @ , so this must be of form + * socks5h://username[:password]@host:port */ + port = malloc(len + 1); + memcpy(port, &(str[start]), len); + port[len] = '\0'; + }else if(username_or_host){ + /* Haven't seen a @ before, so must be of form + * socks5h://host:port */ + host = username_or_host; + username_or_host = NULL; + port = malloc(len + 1); + memcpy(port, &(str[start]), len); + port[len] = '\0'; + }else{ + host = malloc(len + 1); + memcpy(host, &(str[start]), len); + host[len] = '\0'; + } + } + + if(!host){ + fprintf(stderr, "Error: Invalid proxy.\n"); + goto cleanup; + } + + if(mosquitto__urldecode(username)){ + goto cleanup; + } + if(mosquitto__urldecode(password)){ + goto cleanup; + } + if(port){ + port_int = atoi(port); + if(port_int < 1 || port_int > 65535){ + fprintf(stderr, "Error: Invalid proxy port %d\n", port_int); + goto cleanup; + } + free(port); + }else{ + port_int = 1080; + } + + cfg->socks5_username = username; + cfg->socks5_password = password; + cfg->socks5_host = host; + cfg->socks5_port = port_int; + + return 0; +cleanup: + if(username_or_host) free(username_or_host); + if(username) free(username); + if(password) free(password); + if(host) free(host); + if(port) free(port); + return 1; +} + +#endif diff --git a/client_shared.h b/client_shared.h new file mode 100644 index 0000000..3f8b14b --- /dev/null +++ b/client_shared.h @@ -0,0 +1,107 @@ +/* +Copyright (c) 2014 Roger Light <roger@atchoo.org> + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#ifndef _CLIENT_CONFIG_H +#define _CLIENT_CONFIG_H + +#include <stdio.h> + +/* pub_client.c modes */ +#define MSGMODE_NONE 0 +#define MSGMODE_CMD 1 +#define MSGMODE_STDIN_LINE 2 +#define MSGMODE_STDIN_FILE 3 +#define MSGMODE_FILE 4 +#define MSGMODE_NULL 5 + +#define CLIENT_PUB 1 +#define CLIENT_SUB 2 + +struct mosq_config { + char *id; + char *id_prefix; + int protocol_version; + int keepalive; + char *host; + int port; + int qos; + bool retain; + int pub_mode; /* pub */ + char *file_input; /* pub */ + char *message; /* pub */ + long msglen; /* pub */ + char *topic; /* pub */ + char *bind_address; +#ifdef WITH_SRV + bool use_srv; +#endif + bool debug; + bool quiet; + unsigned int max_inflight; + char *username; + char *password; + char *will_topic; + char *will_payload; + long will_payloadlen; + int will_qos; + bool will_retain; +#ifdef WITH_TLS + char *cafile; + char *capath; + char *certfile; + char *keyfile; + char *ciphers; + bool insecure; + char *tls_version; +# ifdef WITH_TLS_PSK + char *psk; + char *psk_identity; +# endif +#endif + bool clean_session; /* sub */ + char **topics; /* sub */ + int topic_count; /* sub */ + bool no_retain; /* sub */ + char **filter_outs; /* sub */ + int filter_out_count; /* sub */ + bool verbose; /* sub */ + bool eol; /* sub */ + int msg_count; /* sub */ +#ifdef WITH_SOCKS + char *socks5_host; + int socks5_port; + char *socks5_username; + char *socks5_password; +#endif + + /* dirpub */ + bool isfmask; + bool overwrite; + char *fmask; + char ffmask[1000]; /* limit 1000 bytes. */ + char ftoken[1000]; /* limit 1000 bytes. */ + char *idtext; + char *fmask_topic; + +}; + +int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]); +void client_config_cleanup(struct mosq_config *cfg); +int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg); +int client_id_generate(struct mosq_config *cfg, const char *id_base); +int client_connect(struct mosquitto *mosq, struct mosq_config *cfg); + +#endif diff --git a/sub_client.c b/sub_client.c index 787ed14..2a44f3a 100644 --- a/sub_client.c +++ b/sub_client.c @@ -1,31 +1,18 @@ /* -Copyright (c) 2009-2013 Roger Light <roger@atchoo.org> -Copyright (c) 2013-2014 V.Krishn <vkrishn@insteps.net> -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. -3. Neither the name of mosquitto nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. +Copyright (c) 2009-2014 Roger Light <roger@atchoo.org> +Copyright (c) 2013-2015 V.Krishn <vkrishn@insteps.net> + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. */ #include <assert.h> @@ -47,34 +34,10 @@ POSSIBILITY OF SUCH DAMAGE. #include <time.h> #include <mosquitto.h> +#include "client_shared.h" -/* This struct is used to pass data to callbacks. - * An instance "ud" is created in main() and populated, then passed to - * mosquitto_new(). */ -struct userdata { - char **topics; - int topic_count; - int topic_qos; - char **filter_outs; - int filter_out_count; - char *username; - char *password; - int verbose; - bool quiet; - bool no_retain; - bool eol; - bool isfmask; - char *fmask; - char ffmask[1000]; /* limit 1000 bytes. */ - char ftoken[1000]; /* limit 1000 bytes. */ - //char *fmask_resolve; - char *idtext; - //char *cwd; - //char *path; - char *fmask_topic; - bool overwrite; -}; - +bool process_messages = true; +int msg_count = 0; /* @(#)Purpose: Create all directories in path @(#)Author: J Leffler @@ -125,9 +88,9 @@ int mkpath(const char *path, mode_t mode) } /* ------------------------------------------------------------- */ -/* - * Expand --fmask string options for output filename. - * DateTime string expansion for --fmask +/* Expand --fmask string options for output filename. + DateTime string expansion for --fmask +*/ /* ------------------------------------------------------------- */ #define FMASK_EPOCH 0 #define FMASK_DATE 1 @@ -155,7 +118,7 @@ const char *datetime(int fmt) switch(fmt) { case FMASK_EPOCH: - n = snprintf(dt, size, "%02d", current); + n = snprintf(dt, size, "%02d", (int)current); break; case FMASK_DATE: n = snprintf(dt, size, "%02d-%02d-%02d", @@ -203,20 +166,19 @@ const char *datetime(int fmt) } /* ------------------------------------------------------------- */ -/* - * Expand/resolve fmask token string. +/* Expand/resolve fmask token string. */ /* ------------------------------------------------------------- */ void *_setfmask(char *token, void *obj) { - struct userdata *ud; + struct mosq_config *cfg; assert(obj); - ud = (struct userdata *)obj; + cfg = (struct mosq_config *)obj; char *str2, *subtoken; char *saveptr2; - char *to = ud->ftoken; /* limit 1000 bytes. */ + char *to = cfg->ftoken; /* limit 1000 bytes. */ const char *dt; for (str2 = token; ; str2 = NULL) { @@ -246,27 +208,27 @@ void *_setfmask(char *token, void *obj) } else if(!strcmp(subtoken, "sec")) { dt = datetime(9); } else if(!strcmp(subtoken, "topic")) { - dt = ud->fmask_topic; + dt = cfg->fmask_topic; } else if(!strcmp(subtoken, "topic1")) { - dt = ud->topics[0]; + dt = cfg->topics[0]; } else if(!strcmp(subtoken, "topic2")) { - dt = ud->topics[1]; + dt = cfg->topics[1]; } else if(!strcmp(subtoken, "topic3")) { - dt = ud->topics[2]; + dt = cfg->topics[2]; } else if(!strcmp(subtoken, "topic4")) { - dt = ud->topics[3]; + dt = cfg->topics[3]; } else if(!strcmp(subtoken, "topic5")) { - dt = ud->topics[4]; + dt = cfg->topics[4]; } else if(!strcmp(subtoken, "topic6")) { - dt = ud->topics[5]; + dt = cfg->topics[5]; } else if(!strcmp(subtoken, "topic7")) { - dt = ud->topics[6]; + dt = cfg->topics[6]; } else if(!strcmp(subtoken, "topic8")) { - dt = ud->topics[7]; + dt = cfg->topics[7]; } else if(!strcmp(subtoken, "topic9")) { - dt = ud->topics[8]; + dt = cfg->topics[8]; } else if(!strcmp(subtoken, "id")) { - dt = ud->idtext; + dt = cfg->idtext; } else { dt = strdup (subtoken); } @@ -276,26 +238,25 @@ void *_setfmask(char *token, void *obj) } to[strlen(to)-1] = '\0'; + return 0; } -/* - * Expand --fmask string options for output filename. +/* Expand --fmask string options for output filename. */ /* ------------------------------------------------------------- */ void *_fmask(char *fmask, void *obj) { - struct userdata *ud; + struct mosq_config *cfg; assert(obj); - ud = (struct userdata *)obj; + cfg = (struct mosq_config *)obj; - char *str1, *str2, *token, *subtoken; - char *saveptr1, *saveptr2; - int j; + char *str1, *token; + char *saveptr1; - char *path, *prog; + char *path; path = strdup (fmask); - char *to = ud->ffmask; /* limit 1000 bytes. */ + char *to = cfg->ffmask; /* limit 1000 bytes. */ to = stpcpy (to, "/"); for (str1 = path; ; str1 = NULL) { @@ -304,18 +265,20 @@ void *_fmask(char *fmask, void *obj) break; /* format type */ - _setfmask(token, ud); - to = stpcpy (to, ud->ftoken); + _setfmask(token, cfg); + to = stpcpy (to, cfg->ftoken); to = stpcpy (to, "/"); } to[strlen(to)-1] = '\0'; + return 0; } -/* - * File open with given mode. - * returns file descriptor (fd) +/* + File open with given mode. + returns file descriptor (fd) +*/ /* ------------------------------------------------------------- */ FILE *_mosquitto_fopen(const char *path, const char *mode) { @@ -335,38 +298,38 @@ FILE *_mosquitto_fopen(const char *path, const char *mode) void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { - struct userdata *ud; + struct mosq_config *cfg; int i; bool res; assert(obj); - ud = (struct userdata *)obj; + cfg = (struct mosq_config *)obj; - if(message->retain && ud->no_retain) return; - if(ud->filter_outs){ - for(i=0; i<ud->filter_out_count; i++){ - mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res); + if(message->retain && cfg->no_retain) return; + if(cfg->filter_outs){ + for(i=0; i<cfg->filter_out_count; i++){ + mosquitto_topic_matches_sub(cfg->filter_outs[i], message->topic, &res); if(res) return; } } - ud->fmask_topic = message->topic; + cfg->fmask_topic = message->topic; FILE *fptr = NULL; - _fmask(ud->fmask, ud); + _fmask(cfg->fmask, cfg); char *path, *prog; - path = strdup (ud->ffmask); - prog = strdup (ud->ffmask); + path = strdup (cfg->ffmask); + prog = strdup (cfg->ffmask); path = dirname (path); prog = basename (prog); - int status = mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - if(ud->overwrite) { - fptr = _mosquitto_fopen(ud->ffmask, "w"); + if(cfg->overwrite) { + fptr = _mosquitto_fopen(cfg->ffmask, "w"); } else { - fptr = _mosquitto_fopen(ud->ffmask, "a"); + fptr = _mosquitto_fopen(cfg->ffmask, "a"); } if(!fptr){ @@ -374,22 +337,22 @@ void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mo // need to do normal stdout //mosquitto_message_callback_set(mosq, "my_message_callback"); } else{ - if(ud->verbose){ + if(cfg->verbose){ if(message->payloadlen){ fprintf(fptr, "%s ", message->topic); fwrite(message->payload, 1, message->payloadlen, fptr); - if(ud->eol){ + if(cfg->eol){ fprintf(fptr, "\n"); } }else{ - if(ud->eol){ + if(cfg->eol){ fprintf(fptr, "%s (null)\n", message->topic); } } }else{ if(message->payloadlen){ fwrite(message->payload, 1, message->payloadlen, fptr); - if(ud->eol){ + if(cfg->eol){ fprintf(fptr, "\n"); } } @@ -400,30 +363,32 @@ void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mo void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { - struct userdata *ud; + struct mosq_config *cfg; int i; bool res; + if(process_messages == false) return; + assert(obj); - ud = (struct userdata *)obj; + cfg = (struct mosq_config *)obj; - if(message->retain && ud->no_retain) return; - if(ud->filter_outs){ - for(i=0; i<ud->filter_out_count; i++){ - mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res); + if(message->retain && cfg->no_retain) return; + if(cfg->filter_outs){ + for(i=0; i<cfg->filter_out_count; i++){ + mosquitto_topic_matches_sub(cfg->filter_outs[i], message->topic, &res); if(res) return; } } - if(ud->verbose){ + if(cfg->verbose){ if(message->payloadlen){ printf("%s ", message->topic); fwrite(message->payload, 1, message->payloadlen, stdout); - if(ud->eol){ + if(cfg->eol){ printf("\n"); } }else{ - if(ud->eol){ + if(cfg->eol){ printf("%s (null)\n", message->topic); } } @@ -431,28 +396,35 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit }else{ if(message->payloadlen){ fwrite(message->payload, 1, message->payloadlen, stdout); - if(ud->eol){ + if(cfg->eol){ printf("\n"); } fflush(stdout); } } + if(cfg->msg_count>0){ + msg_count++; + if(cfg->msg_count == msg_count){ + process_messages = false; + mosquitto_disconnect(mosq); + } + } } void my_connect_callback(struct mosquitto *mosq, void *obj, int result) { int i; - struct userdata *ud; + struct mosq_config *cfg; assert(obj); - ud = (struct userdata *)obj; + cfg = (struct mosq_config *)obj; if(!result){ - for(i=0; i<ud->topic_count; i++){ - mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos); + for(i=0; i<cfg->topic_count; i++){ + mosquitto_subscribe(mosq, NULL, cfg->topics[i], cfg->qos); } }else{ - if(result && !ud->quiet){ + if(result && !cfg->quiet){ fprintf(stderr, "%s\n", mosquitto_connack_string(result)); } } @@ -461,16 +433,16 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result) void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { int i; - struct userdata *ud; + struct mosq_config *cfg; assert(obj); - ud = (struct userdata *)obj; + cfg = (struct mosq_config *)obj; - if(!ud->quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); + if(!cfg->quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ - if(!ud->quiet) printf(", %d", granted_qos[i]); + if(!cfg->quiet) printf(", %d", granted_qos[i]); } - if(!ud->quiet) printf("\n"); + if(!cfg->quiet) printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) @@ -486,7 +458,7 @@ void print_usage(void) printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n"); printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision); printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n"); - printf(" [-T filter_out]\n"); + printf(" [-C msg_count] [-T filter_out]\n"); #ifdef WITH_SRV printf(" [-A bind_address] [-S]\n"); #else @@ -504,10 +476,14 @@ void print_usage(void) printf(" [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n"); #endif #endif +#ifdef WITH_SOCKS + printf(" [--proxy socks-url]\n"); +#endif printf(" mosquitto_sub --help\n\n"); printf(" -A : bind the outgoing socket to this host/ip address. Use to control which interface\n"); printf(" the client communicates over.\n"); printf(" -c : disable 'clean session' (store subscription and pending messages when client disconnects).\n"); + printf(" -C : disconnect and exit after receiving the 'msg_count' messages.\n"); printf(" -d : enable debug messages.\n"); printf(" -h : mqtt host to connect to. Defaults to localhost.\n"); printf(" -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.\n"); @@ -516,15 +492,18 @@ void print_usage(void) printf(" -k : keep alive in seconds for this client. Defaults to 60.\n"); printf(" -N : do not add an end of line character when printing the payload.\n"); printf(" -p : network port to connect to. Defaults to 1883.\n"); + printf(" -P : provide a password (requires MQTT 3.1 broker)\n"); printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n"); printf(" -R : do not print stale messages (those with retain set).\n"); #ifdef WITH_SRV printf(" -S : use SRV lookups to determine which host to connect to.\n"); #endif printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n"); + printf(" -T : topic string to filter out of results. May be repeated.\n"); printf(" -u : provide a username (requires MQTT 3.1 broker)\n"); printf(" -v : print published messages verbosely.\n"); - printf(" -P : provide a password (requires MQTT 3.1 broker)\n"); + printf(" -V : specify the version of the MQTT protocol to use when connecting.\n"); + printf(" Can be mqttv31 or mqttv311. Defaults to mqttv31.\n"); printf(" --help : display this message.\n"); printf(" --quiet : don't print error messages.\n"); printf(" --fmask : path to message outfile\n"); @@ -557,474 +536,78 @@ void print_usage(void) printf(" --psk-identity : client identity string for TLS-PSK mode.\n"); #endif #endif +#ifdef WITH_SOCKS + printf(" --proxy : SOCKS5 proxy URL of the form:\n"); + printf(" socks5h://[username[:password]@]hostname[:port]\n"); + printf(" Only \"none\" and \"username\" authentication is supported.\n"); +#endif printf("\nSee http://mosquitto.org/ for more information.\n\n"); } int main(int argc, char *argv[]) { - char *id = NULL; - char *id_prefix = NULL; - int i; - char *host = "localhost"; - int port = 1883; - char *bind_address = NULL; - int keepalive = 60; - bool clean_session = true; - bool debug = false; + struct mosq_config cfg; struct mosquitto *mosq = NULL; int rc; - char hostname[256]; - char err[1024]; - struct userdata ud; - int len; - char *will_payload = NULL; - long will_payloadlen = 0; - int will_qos = 0; - bool will_retain = false; - char *will_topic = NULL; - - bool insecure = false; - char *cafile = NULL; - char *capath = NULL; - char *certfile = NULL; - char *keyfile = NULL; - char *tls_version = NULL; - - char *psk = NULL; - char *psk_identity = NULL; - - char *ciphers = NULL; - - bool use_srv = false; - - memset(&ud, 0, sizeof(struct userdata)); - ud.eol = true; - - for(i=1; i<argc; i++){ - if(!strcmp(argv[i], "-p") || !strcmp(argv[i], "--port")){ - if(i==argc-1){ - fprintf(stderr, "Error: -p argument given but no port specified.\n\n"); - print_usage(); - return 1; - }else{ - port = atoi(argv[i+1]); - if(port<1 || port>65535){ - fprintf(stderr, "Error: Invalid port given: %d\n", port); - print_usage(); - return 1; - } - } - i++; - }else if(!strcmp(argv[i], "-A")){ - if(i==argc-1){ - fprintf(stderr, "Error: -A argument given but no address specified.\n\n"); - print_usage(); - return 1; - }else{ - bind_address = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "--disable-clean-session")){ - clean_session = false; - }else if(!strcmp(argv[i], "--fmask")){ - if(i==argc-1){ - fprintf(stderr, "Error: --fmask argument given but no outfile specified.\n\n"); - print_usage(); - return 1; - }else{ - ud.fmask = argv[i+1]; - ud.isfmask = true; - } - i++; - }else if(!strcmp(argv[i], "--overwrite")){ - ud.overwrite = true; - }else if(!strcmp(argv[i], "--cafile")){ - if(i==argc-1){ - fprintf(stderr, "Error: --cafile argument given but no file specified.\n\n"); - print_usage(); - return 1; - }else{ - cafile = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "--capath")){ - if(i==argc-1){ - fprintf(stderr, "Error: --capath argument given but no directory specified.\n\n"); - print_usage(); - return 1; - }else{ - capath = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "--cert")){ - if(i==argc-1){ - fprintf(stderr, "Error: --cert argument given but no file specified.\n\n"); - print_usage(); - return 1; - }else{ - certfile = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "--ciphers")){ - if(i==argc-1){ - fprintf(stderr, "Error: --ciphers argument given but no ciphers specified.\n\n"); - print_usage(); - return 1; - }else{ - ciphers = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){ - debug = true; - }else if(!strcmp(argv[i], "--help")){ + rc = client_config_load(&cfg, CLIENT_SUB, argc, argv); + if(rc){ + client_config_cleanup(&cfg); + if(rc == 2){ + /* --help */ print_usage(); - return 0; - }else if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--host")){ - if(i==argc-1){ - fprintf(stderr, "Error: -h argument given but no host specified.\n\n"); - print_usage(); - return 1; - }else{ - host = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "--insecure")){ - insecure = true; - }else if(!strcmp(argv[i], "-i") || !strcmp(argv[i], "--id")){ - if(id_prefix){ - fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n"); - print_usage(); - return 1; - } - if(i==argc-1){ - fprintf(stderr, "Error: -i argument given but no id specified.\n\n"); - print_usage(); - return 1; - }else{ - id = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-I") || !strcmp(argv[i], "--id-prefix")){ - if(id){ - fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n"); - print_usage(); - return 1; - } - if(i==argc-1){ - fprintf(stderr, "Error: -I argument given but no id prefix specified.\n\n"); - print_usage(); - return 1; - }else{ - id_prefix = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-k") || !strcmp(argv[i], "--keepalive")){ - if(i==argc-1){ - fprintf(stderr, "Error: -k argument given but no keepalive specified.\n\n"); - print_usage(); - return 1; - }else{ - keepalive = atoi(argv[i+1]); - if(keepalive>65535){ - fprintf(stderr, "Error: Invalid keepalive given: %d\n", keepalive); - print_usage(); - return 1; - } - } - i++; - }else if(!strcmp(argv[i], "--key")){ - if(i==argc-1){ - fprintf(stderr, "Error: --key argument given but no file specified.\n\n"); - print_usage(); - return 1; - }else{ - keyfile = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-N")){ - ud.eol = false; - }else if(!strcmp(argv[i], "--psk")){ - if(i==argc-1){ - fprintf(stderr, "Error: --psk argument given but no key specified.\n\n"); - print_usage(); - return 1; - }else{ - psk = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "--psk-identity")){ - if(i==argc-1){ - fprintf(stderr, "Error: --psk-identity argument given but no identity specified.\n\n"); - print_usage(); - return 1; - }else{ - psk_identity = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-q") || !strcmp(argv[i], "--qos")){ - if(i==argc-1){ - fprintf(stderr, "Error: -q argument given but no QoS specified.\n\n"); - print_usage(); - return 1; - }else{ - ud.topic_qos = atoi(argv[i+1]); - if(ud.topic_qos<0 || ud.topic_qos>2){ - fprintf(stderr, "Error: Invalid QoS given: %d\n", ud.topic_qos); - print_usage(); - return 1; - } - } - i++; - }else if(!strcmp(argv[i], "--quiet")){ - ud.quiet = true; - }else if(!strcmp(argv[i], "-R")){ - ud.no_retain = true; -#ifdef WITH_SRV - }else if(!strcmp(argv[i], "-S")){ - use_srv = true; -#endif - }else if(!strcmp(argv[i], "-t") || !strcmp(argv[i], "--topic")){ - if(i==argc-1){ - fprintf(stderr, "Error: -t argument given but no topic specified.\n\n"); - print_usage(); - return 1; - }else{ - ud.topic_count++; - ud.topics = realloc(ud.topics, ud.topic_count*sizeof(char *)); - ud.topics[ud.topic_count-1] = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){ - if(i==argc-1){ - fprintf(stderr, "Error: -T argument given but no topic filter specified.\n\n"); - print_usage(); - return 1; - }else{ - ud.filter_out_count++; - ud.filter_outs = realloc(ud.filter_outs, ud.filter_out_count*sizeof(char *)); - ud.filter_outs[ud.filter_out_count-1] = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "--tls-version")){ - if(i==argc-1){ - fprintf(stderr, "Error: --tls-version argument given but no version specified.\n\n"); - print_usage(); - return 1; - }else{ - tls_version = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-u") || !strcmp(argv[i], "--username")){ - if(i==argc-1){ - fprintf(stderr, "Error: -u argument given but no username specified.\n\n"); - print_usage(); - return 1; - }else{ - ud.username = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "-v") || !strcmp(argv[i], "--verbose")){ - ud.verbose = 1; - }else if(!strcmp(argv[i], "-P") || !strcmp(argv[i], "--pw")){ - if(i==argc-1){ - fprintf(stderr, "Error: -P argument given but no password specified.\n\n"); - print_usage(); - return 1; - }else{ - ud.password = argv[i+1]; - } - i++; - }else if(!strcmp(argv[i], "--will-payload")){ - if(i==argc-1){ - fprintf(stderr, "Error: --will-payload argument given but no will payload specified.\n\n"); - print_usage(); - return 1; - }else{ - will_payload = argv[i+1]; - will_payloadlen = strlen(will_payload); - } - i++; - }else if(!strcmp(argv[i], "--will-qos")){ - if(i==argc-1){ - fprintf(stderr, "Error: --will-qos argument given but no will QoS specified.\n\n"); - print_usage(); - return 1; - }else{ - will_qos = atoi(argv[i+1]); - if(will_qos < 0 || will_qos > 2){ - fprintf(stderr, "Error: Invalid will QoS %d.\n\n", will_qos); - return 1; - } - } - i++; - }else if(!strcmp(argv[i], "--will-retain")){ - will_retain = true; - }else if(!strcmp(argv[i], "--will-topic")){ - if(i==argc-1){ - fprintf(stderr, "Error: --will-topic argument given but no will topic specified.\n\n"); - print_usage(); - return 1; - }else{ - will_topic = argv[i+1]; - } - i++; }else{ - fprintf(stderr, "Error: Unknown option '%s'.\n",argv[i]); - print_usage(); - return 1; + fprintf(stderr, "\nUse 'mosquitto_sub --help' to see usage.\n"); } - } - - if(clean_session == false && (id_prefix || !id)){ - if(!ud.quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n"); - return 1; - } - - if(ud.topic_count == 0){ - fprintf(stderr, "Error: You must specify a topic to subscribe to.\n"); - print_usage(); - return 1; - } - if(will_payload && !will_topic){ - fprintf(stderr, "Error: Will payload given, but no will topic given.\n"); - print_usage(); - return 1; - } - if(will_retain && !will_topic){ - fprintf(stderr, "Error: Will retain given, but no will topic given.\n"); - print_usage(); - return 1; - } - if(ud.password && !ud.username){ - if(!ud.quiet) fprintf(stderr, "Warning: Not using password since username not set.\n"); - } - if((certfile && !keyfile) || (keyfile && !certfile)){ - fprintf(stderr, "Error: Both certfile and keyfile must be provided if one of them is.\n"); - print_usage(); - return 1; - } - if((cafile || capath) && psk){ - if(!ud.quiet) fprintf(stderr, "Error: Only one of --psk or --cafile/--capath may be used at once.\n"); - return 1; - } - if(psk && !psk_identity){ - if(!ud.quiet) fprintf(stderr, "Error: --psk-identity required if --psk used.\n"); return 1; } mosquitto_lib_init(); - if(id_prefix){ - id = malloc(strlen(id_prefix)+10); - if(!id){ - if(!ud.quiet) fprintf(stderr, "Error: Out of memory.\n"); - mosquitto_lib_cleanup(); - return 1; - } - snprintf(id, strlen(id_prefix)+10, "%s%d", id_prefix, getpid()); - }else if(!id){ - hostname[0] = '\0'; - gethostname(hostname, 256); - hostname[255] = '\0'; - len = strlen("mosqsub/-") + 6 + strlen(hostname); - id = malloc(len); - if(!id){ - if(!ud.quiet) fprintf(stderr, "Error: Out of memory.\n"); - mosquitto_lib_cleanup(); - return 1; - } - snprintf(id, len, "mosqsub/%d-%s", getpid(), hostname); - if(strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH){ - /* Enforce maximum client id length of 23 characters */ - id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; - } + if(client_id_generate(&cfg, "mosqsub")){ + return 1; } - mosq = mosquitto_new(id, clean_session, &ud); - ud.idtext = id; + mosq = mosquitto_new(cfg.id, cfg.clean_session, &cfg); + cfg.idtext = cfg.id; if(!mosq){ switch(errno){ case ENOMEM: - if(!ud.quiet) fprintf(stderr, "Error: Out of memory.\n"); + if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n"); break; case EINVAL: - if(!ud.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session.\n"); + if(!cfg.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session.\n"); break; } mosquitto_lib_cleanup(); return 1; } - if(debug){ - mosquitto_log_callback_set(mosq, my_log_callback); - } - if(will_topic && mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)){ - if(!ud.quiet) fprintf(stderr, "Error: Problem setting will.\n"); - mosquitto_lib_cleanup(); - return 1; - } - if(ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)){ - if(!ud.quiet) fprintf(stderr, "Error: Problem setting username and password.\n"); - mosquitto_lib_cleanup(); + if(client_opts_set(mosq, &cfg)){ return 1; } - if((cafile || capath) && mosquitto_tls_set(mosq, cafile, capath, certfile, keyfile, NULL)){ - if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n"); - mosquitto_lib_cleanup(); - return 1; - } - if(insecure && mosquitto_tls_insecure_set(mosq, true)){ - if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS insecure option.\n"); - mosquitto_lib_cleanup(); - return 1; - } - if(psk && mosquitto_tls_psk_set(mosq, psk, psk_identity, NULL)){ - if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS-PSK options.\n"); - mosquitto_lib_cleanup(); - return 1; - } - if(tls_version && mosquitto_tls_opts_set(mosq, 1, tls_version, ciphers)){ - if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n"); - mosquitto_lib_cleanup(); - return 1; + if(cfg.debug){ + mosquitto_log_callback_set(mosq, my_log_callback); + mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); } mosquitto_connect_callback_set(mosq, my_connect_callback); - if(ud.isfmask) { + if(cfg.isfmask) { mosquitto_message_callback_set(mosq, my_message_file_callback); } else { mosquitto_message_callback_set(mosq, my_message_callback); } - if(debug){ - mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); - } - if(use_srv){ - rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address); - }else{ - rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address); - } - if(rc){ - if(!ud.quiet){ - if(rc == MOSQ_ERR_ERRNO){ -#ifndef WIN32 - strerror_r(errno, err, 1024); -#else - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL); -#endif - fprintf(stderr, "Error: %s\n", err); - }else{ - fprintf(stderr, "Unable to connect (%d).\n", rc); - } - } - mosquitto_lib_cleanup(); - return rc; - } + rc = client_connect(mosq, &cfg); + if(rc) return rc; + rc = mosquitto_loop_forever(mosq, -1, 1); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); + if(cfg.msg_count>0 && rc == MOSQ_ERR_NO_CONN){ + rc = 0; + } if(rc){ fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc)); } |