aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorV.Krishn <vkrishn4@gmail.com>2014-03-20 16:14:54 +0530
committerV.Krishn <vkrishn4@gmail.com>2014-03-20 16:14:54 +0530
commit8da2387f7ceb1fc22d30a435f3944cc350c02eb8 (patch)
treed0d10fef2749cc045547b6a795dc94a8b1a8e340
parent2767f15766dc94b1b18e0b8b94600c6d0981403f (diff)
downloadmqtt-dirpub-8da2387f7ceb1fc22d30a435f3944cc350c02eb8.tar.bz2
Update diff from v1.3.0
-rw-r--r--sub_client.c106
1 files changed, 83 insertions, 23 deletions
diff --git a/sub_client.c b/sub_client.c
index 98fbdf6..b504d09 100644
--- a/sub_client.c
+++ b/sub_client.c
@@ -46,7 +46,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <sys/types.h>
#include <time.h>
-
#include <mosquitto.h>
/* This struct is used to pass data to callbacks.
@@ -56,11 +55,14 @@ struct userdata {
char **topics;
int topic_count;
int topic_qos;
+ char **filter_outs;
+ int filter_out_count;
char *username;
char *password;
int verbose;
bool quiet;
bool no_retain;
+ bool eol;
bool isfmask;
char *fmask;
char ffmask[1000]; /* limit 1000 bytes. */
@@ -350,16 +352,21 @@ FILE *_mosquitto_fopen(const char *path, const char *mode)
void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
struct userdata *ud;
+ int i;
+ bool res;
assert(obj);
ud = (struct userdata *)obj;
if(message->retain && ud->no_retain) return;
+ if(ud->filter_outs){
+ for(i=0; i<ud->filter_out_count; i++){
+ mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res);
+ if(res) return;
+ }
+ }
ud->fmask_topic = message->topic;
- //char buf[100]; /* limit 100 bytes. */
- //char *cwd = getcwd(buf, 100);
- //ud->cwd = cwd;
FILE *fptr = NULL;
_fmask(ud->fmask, ud);
@@ -387,43 +394,62 @@ void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mo
if(message->payloadlen){
fprintf(fptr, "%s ", message->topic);
fwrite(message->payload, 1, message->payloadlen, fptr);
- fprintf(fptr, "\n");
+ if(ud->eol){
+ fprintf(fptr, "\n");
+ }
}else{
- fprintf(fptr, "%s (null)\n", message->topic);
+ if(ud->eol){
+ fprintf(fptr, "%s (null)\n", message->topic);
+ }
}
}else{
if(message->payloadlen){
fwrite(message->payload, 1, message->payloadlen, fptr);
- fprintf(fptr, "\n");
+ if(ud->eol){
+ fprintf(fptr, "\n");
+ }
}
}
fclose(fptr);
}
-
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
struct userdata *ud;
+ int i;
+ bool res;
assert(obj);
ud = (struct userdata *)obj;
if(message->retain && ud->no_retain) return;
+ if(ud->filter_outs){
+ for(i=0; i<ud->filter_out_count; i++){
+ mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res);
+ if(res) return;
+ }
+ }
if(ud->verbose){
if(message->payloadlen){
printf("%s ", message->topic);
fwrite(message->payload, 1, message->payloadlen, stdout);
- printf("\n");
+ if(ud->eol){
+ printf("\n");
+ }
}else{
- printf("%s (null)\n", message->topic);
+ if(ud->eol){
+ printf("%s (null)\n", message->topic);
+ }
}
fflush(stdout);
}else{
if(message->payloadlen){
fwrite(message->payload, 1, message->payloadlen, stdout);
- printf("\n");
+ if(ud->eol){
+ printf("\n");
+ }
fflush(stdout);
}
}
@@ -475,17 +501,19 @@ void print_usage(void)
mosquitto_lib_version(&major, &minor, &revision);
printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n");
printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
- printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] [-v] -t topic ...\n");
- printf(" [-A bind_address]\n");
+ printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] -t topic ...\n");
+ printf(" [-T filter_out]\n");
+ printf(" [-A bind_address] [-S]\n");
printf(" [-i id] [-I id_prefix]\n");
- printf(" [-d] [--quiet]\n");
+ printf(" [-d] [-N] [--quiet] [-v]\n");
printf(" [-u username [-P password]]\n");
printf(" [--fmask outfile]\n");
printf(" [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n");
#ifdef WITH_TLS
- printf(" [{--cafile file | --capath dir} [--cert file] [--key file] [--insecure]]\n");
+ printf(" [{--cafile file | --capath dir} [--cert file] [--key file]\n");
+ printf(" [--ciphers ciphers] [--insecure]]\n");
#ifdef WITH_TLS_PSK
- printf(" [--psk hex-key --psk-identity identity]\n");
+ printf(" [--psk hex-key --psk-identity identity [--ciphers ciphers]]\n");
#endif
#endif
printf(" mosquitto_sub --help\n\n");
@@ -498,9 +526,11 @@ void print_usage(void)
printf(" -I : define the client id as id_prefix appended with the process id. Useful for when the\n");
printf(" broker is using the clientid_prefixes option.\n");
printf(" -k : keep alive in seconds for this client. Defaults to 60.\n");
+ printf(" -N : do not add an end of line character when printing the payload.\n");
printf(" -p : network port to connect to. Defaults to 1883.\n");
printf(" -q : quality of service level to use for the subscription. Defaults to 0.\n");
printf(" -R : do not print stale messages (those with retain set).\n");
+ printf(" -S : use SRV lookups to determine which host to connect to.\n");
printf(" -t : mqtt topic to subscribe to. May be repeated multiple times.\n");
printf(" -u : provide a username (requires MQTT 3.1 broker)\n");
printf(" -v : print published messages verbosely.\n");
@@ -525,6 +555,7 @@ void print_usage(void)
printf(" communication.\n");
printf(" --cert : client certificate for authentication, if required by server.\n");
printf(" --key : client private key for authentication, if required by server.\n");
+ printf(" --ciphers : openssl compatible list of TLS ciphers to support.\n");
printf(" --tls-version : TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1.\n");
printf(" Defaults to tlsv1.2 if available.\n");
printf(" --insecure : do not check that the server certificate hostname matches the remote\n");
@@ -573,7 +604,12 @@ int main(int argc, char *argv[])
char *psk = NULL;
char *psk_identity = NULL;
+ char *ciphers = NULL;
+
+ bool use_srv = false;
+
memset(&ud, 0, sizeof(struct userdata));
+ ud.eol = true;
for(i=1; i<argc; i++){
if(!strcmp(argv[i], "-p") || !strcmp(argv[i], "--port")){
@@ -640,6 +676,15 @@ int main(int argc, char *argv[])
certfile = argv[i+1];
}
i++;
+ }else if(!strcmp(argv[i], "--ciphers")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: --ciphers argument given but no ciphers specified.\n\n");
+ print_usage();
+ return 1;
+ }else{
+ ciphers = argv[i+1];
+ }
+ i++;
}else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){
debug = true;
}else if(!strcmp(argv[i], "--help")){
@@ -707,6 +752,8 @@ int main(int argc, char *argv[])
keyfile = argv[i+1];
}
i++;
+ }else if(!strcmp(argv[i], "-N")){
+ ud.eol = false;
}else if(!strcmp(argv[i], "--psk")){
if(i==argc-1){
fprintf(stderr, "Error: --psk argument given but no key specified.\n\n");
@@ -743,6 +790,8 @@ int main(int argc, char *argv[])
ud.quiet = true;
}else if(!strcmp(argv[i], "-R")){
ud.no_retain = true;
+ }else if(!strcmp(argv[i], "-S")){
+ use_srv = true;
}else if(!strcmp(argv[i], "-t") || !strcmp(argv[i], "--topic")){
if(i==argc-1){
fprintf(stderr, "Error: -t argument given but no topic specified.\n\n");
@@ -754,6 +803,17 @@ int main(int argc, char *argv[])
ud.topics[ud.topic_count-1] = argv[i+1];
}
i++;
+ }else if(!strcmp(argv[i], "-T") || !strcmp(argv[i], "--filter-out")){
+ if(i==argc-1){
+ fprintf(stderr, "Error: -T argument given but no topic filter specified.\n\n");
+ print_usage();
+ return 1;
+ }else{
+ ud.filter_out_count++;
+ ud.filter_outs = realloc(ud.filter_outs, ud.filter_out_count*sizeof(char *));
+ ud.filter_outs[ud.filter_out_count-1] = argv[i+1];
+ }
+ i++;
}else if(!strcmp(argv[i], "--tls-version")){
if(i==argc-1){
fprintf(stderr, "Error: --tls-version argument given but no version specified.\n\n");
@@ -931,7 +991,7 @@ int main(int argc, char *argv[])
mosquitto_lib_cleanup();
return 1;
}
- if(tls_version && mosquitto_tls_opts_set(mosq, 1, tls_version, NULL)){
+ if(tls_version && mosquitto_tls_opts_set(mosq, 1, tls_version, ciphers)){
if(!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n");
mosquitto_lib_cleanup();
return 1;
@@ -946,7 +1006,11 @@ int main(int argc, char *argv[])
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
}
- rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
+ if(use_srv){
+ rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address);
+ }else{
+ rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
+ }
if(rc){
if(!ud.quiet){
if(rc == MOSQ_ERR_ERRNO){
@@ -970,11 +1034,7 @@ int main(int argc, char *argv[])
mosquitto_lib_cleanup();
if(rc){
- if(rc == MOSQ_ERR_ERRNO){
- fprintf(stderr, "Error: %s\n", strerror(errno));
- }else{
- fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
- }
+ fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
}
return rc;
}