Mosquitto
简述
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
config 解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
| vim mosquitto.conf
# ================================================================= # General configuration # =================================================================
# 客户端心跳的间隔时间 #retry_interval 20
# 系统状态的刷新时间 #sys_interval 10
# 系统资源的回收时间,0表示尽快处理 #store_clean_interval 10
# 服务进程的PID #pid_file /var/run/mosquitto.pid
# 服务进程的系统用户 #user mosquitto
# 客户端心跳消息的最大并发数 #max_inflight_messages 10
# 客户端心跳消息缓存队列 #max_queued_messages 100
# 用于设置客户端长连接的过期时间,默认永不过期 #persistent_client_expiration
# ================================================================= # Default listener # =================================================================
# 服务绑定的IP地址 #bind_address
# 服务绑定的端口号 #port 1883
# 允许的最大连接数,-1表示没有限制 #max_connections -1
# cafile:CA证书文件 # capath:CA证书目录 # certfile:PEM证书文件 # keyfile:PEM密钥文件 #cafile #capath #certfile #keyfile
# 必须提供证书以保证数据安全性 #require_certificate false
# 若require_certificate值为true,use_identity_as_username也必须为true #use_identity_as_username false
# 启用PSK(Pre-shared-key)支持 #psk_hint
# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取 # as the output of that command. #ciphers
# ================================================================= # Persistence # =================================================================
# 消息自动保存的间隔时间 #autosave_interval 1800
# 消息自动保存功能的开关 #autosave_on_changes false
# 持久化功能的开关 persistence true
# 持久化DB文件 #persistence_file mosquitto.db
# 持久化DB文件目录 #persistence_location /var/lib/mosquitto/
# ================================================================= # Logging # =================================================================
# 4种日志模式:stdout、stderr、syslog、topic # none 则表示不记日志,此配置可以提升些许性能 log_dest none
# 选择日志的级别(可设置多项) #log_type error #log_type warning #log_type notice #log_type information
# 是否记录客户端连接信息 #connection_messages true
# 是否记录日志时间 #log_timestamp true
# ================================================================= # Security # =================================================================
# 客户端ID的前缀限制,可用于保证安全性 #clientid_prefixes
# 允许匿名用户 #allow_anonymous true
# 用户/密码文件,默认格式:username:password #password_file
# PSK格式密码文件,默认格式:identity:key #psk_file
# pattern write sensor/%u/data # ACL权限配置,常用语法如下: # 用户限制:user <username> # 话题限制:topic [read|write] <topic> # 正则限制:pattern write sensor/%u/data #acl_file
# ================================================================= # Bridges # =================================================================
# 允许服务之间使用“桥接”模式(可用于分布式部署) #connection <name> #address <host>[:<port>] #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# 设置桥接的客户端ID #clientid
# 桥接断开时,是否清除远程服务器中的消息 #cleansession false
# 是否发布桥接的状态信息 #notifications true
# 设置桥接模式下,消息将会发布到的话题地址 # $SYS/broker/connection/<clientid>/state #notification_topic
# 设置桥接的keepalive数值 #keepalive_interval 60
# 桥接模式,目前有三种:automatic、lazy、once #start_type automatic
# 桥接模式automatic的超时时间 #restart_timeout 30
# 桥接模式lazy的超时时间 #idle_timeout 60
# 桥接客户端的用户名 #username
# 桥接客户端的密码 #password
# bridge_cafile:桥接客户端的CA证书文件 # bridge_capath:桥接客户端的CA证书目录 # bridge_certfile:桥接客户端的PEM证书文件 # bridge_keyfile:桥接客户端的PEM密钥文件 #bridge_cafile #bridge_capath #bridge_certfile #bridge_keyfile
# 自己的配置可以放到以下目录中 #include_dir /mqtt/config/conf.d
|
Qos 解析
Qos=0 –> 最多一次
sequenceDiagram
ClientA->>ServerBroker: 发送消息
ServerBroker->>ClientB: 发送消息
Qos=1 –> 至少一次
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应PUBACK
ServerBroker->>ClientB: 2.发送消息
ClientB->>ServerBroker: 2.1发送消息回应PUBACK
ServerBroker->>ServerBroker: 2.2删除消息
Qos=2 –> 有且仅有一次
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应Rec
ClientA->>ServerBroker: 2.发送消息Rel
ServerBroker->>ServerBroker: 2.1删除消息
ServerBroker->>ServerBroker: 2.2存储消息到发送列队
ServerBroker->>ClientB: 2.3发送消息
ServerBroker->>ClientA: 2.4发送消息回应Comp
ClientB->>ServerBroker: 3.发送消息回应Rec
ServerBroker->>ServerBroker: 3.1删除2.2中存储的消息(一次确认)
ServerBroker->>ServerBroker: 3.2存储消息
ServerBroker->>ClientB: 3.3发送消息Rel
ClientB->>ServerBroker: 3.4发送消息回应Comp
ServerBroker->>ServerBroker: 3.5删除消息(二次确认)
实验
实验1:Mosquitto 内存数据落盘机制
实验目的
Mosquitto 内存数据落盘持久化时,是否会产生重复数据
实验结论
Mosquitto 不会重复持久化已经持久化过的数据
实验环境
mosquitto version 1.4.15
实验方法
- 发送者保持同样的发送速率,降低订阅者消费速度,触发mqtt的持久化机制
- 更改mqtt持久化的间隔(3s 和 30s),观测持久化的数据大小是否不同
实验过程
发送者:发送1w条消息,用时 31.16837883 s
订阅者:每分钟消化1条,1min内堆积9999条消息
Mosquitto:持久化间隔 3s,1min内涨幅 27842 Byte
发送者:发送1w条消息,
订阅者:每分钟消化1条,1min内堆积9999条消息
Mosquitto:持久化间隔 30s,1min内涨幅 27842 Byte
实验脚本
send.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #!/usr/bin/python # -*- coding: utf-8 -*- ''' Send msg to mqtt (linux, python2) ''' __author__ = 'RYB'
import json import time import random import string import paho.mqtt.publish as publish
def ranstr(num): salt = ''.join(random.sample(string.ascii_letters + string.digits, num)) return salt
def publish_msg(msg_num): for i in range(msg_num): msg = { "id": i, "Timestamp": int(time.time() * 1000), "str1": ranstr(50), "str2": ranstr(50) }
publish.single("test", payload=json.dumps(msg), qos=2, hostname="10.244.0.164", port=1883)
print("===== Start =====") start = time.time() print("Running...") publish_msg(10000) end = time.time() print("====== End ======") print("Run Time: %s s" % str(end - start))
|
rec.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #!/usr/bin/python # -*- coding: utf-8 -*- ''' Receive msg from mqtt (linux, python2) ''' __author__ = 'RYB'
import time import paho.mqtt.subscribe as subscribe
def on_message_sleep(client, userdata, msg): print(msg.topic + ' --> ' + msg.payload) time.sleep(60)
subscribe.callback(on_message_sleep, "test", qos=2, hostname="mosquitto", port=1883)
|
参考文档