Mosquitto

简述

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

1573118328615

config 解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
vim mosquitto.conf

# =================================================================
# General configuration
# =================================================================

# 客户端心跳的间隔时间
#retry_interval 20

# 系统状态的刷新时间
#sys_interval 10

# 系统资源的回收时间,0表示尽快处理
#store_clean_interval 10

# 服务进程的PID
#pid_file /var/run/mosquitto.pid

# 服务进程的系统用户
#user mosquitto

# 客户端心跳消息的最大并发数
#max_inflight_messages 10

# 客户端心跳消息缓存队列
#max_queued_messages 100

# 用于设置客户端长连接的过期时间,默认永不过期
#persistent_client_expiration

# =================================================================
# Default listener
# =================================================================

# 服务绑定的IP地址
#bind_address

# 服务绑定的端口号
#port 1883

# 允许的最大连接数,-1表示没有限制
#max_connections -1

# cafile:CA证书文件
# capath:CA证书目录
# certfile:PEM证书文件
# keyfile:PEM密钥文件
#cafile
#capath
#certfile
#keyfile

# 必须提供证书以保证数据安全性
#require_certificate false

# 若require_certificate值为true,use_identity_as_username也必须为true
#use_identity_as_username false

# 启用PSK(Pre-shared-key)支持
#psk_hint

# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取
# as the output of that command.
#ciphers

# =================================================================
# Persistence
# =================================================================

# 消息自动保存的间隔时间
#autosave_interval 1800

# 消息自动保存功能的开关
#autosave_on_changes false

# 持久化功能的开关
persistence true

# 持久化DB文件
#persistence_file mosquitto.db

# 持久化DB文件目录
#persistence_location /var/lib/mosquitto/

# =================================================================
# Logging
# =================================================================

# 4种日志模式:stdout、stderr、syslog、topic
# none 则表示不记日志,此配置可以提升些许性能
log_dest none

# 选择日志的级别(可设置多项)
#log_type error
#log_type warning
#log_type notice
#log_type information

# 是否记录客户端连接信息
#connection_messages true

# 是否记录日志时间
#log_timestamp true

# =================================================================
# Security
# =================================================================

# 客户端ID的前缀限制,可用于保证安全性
#clientid_prefixes

# 允许匿名用户
#allow_anonymous true

# 用户/密码文件,默认格式:username:password
#password_file

# PSK格式密码文件,默认格式:identity:key
#psk_file

# pattern write sensor/%u/data
# ACL权限配置,常用语法如下:
# 用户限制:user <username>
# 话题限制:topic [read|write] <topic>
# 正则限制:pattern write sensor/%u/data
#acl_file

# =================================================================
# Bridges
# =================================================================

# 允许服务之间使用“桥接”模式(可用于分布式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]

# 设置桥接的客户端ID
#clientid

# 桥接断开时,是否清除远程服务器中的消息
#cleansession false

# 是否发布桥接的状态信息
#notifications true

# 设置桥接模式下,消息将会发布到的话题地址
# $SYS/broker/connection/<clientid>/state
#notification_topic

# 设置桥接的keepalive数值
#keepalive_interval 60

# 桥接模式,目前有三种:automatic、lazy、once
#start_type automatic

# 桥接模式automatic的超时时间
#restart_timeout 30

# 桥接模式lazy的超时时间
#idle_timeout 60

# 桥接客户端的用户名
#username

# 桥接客户端的密码
#password

# bridge_cafile:桥接客户端的CA证书文件
# bridge_capath:桥接客户端的CA证书目录
# bridge_certfile:桥接客户端的PEM证书文件
# bridge_keyfile:桥接客户端的PEM密钥文件
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile

# 自己的配置可以放到以下目录中
#include_dir /mqtt/config/conf.d

Qos 解析

Qos=0 –> 最多一次

sequenceDiagram

ClientA->>ServerBroker: 发送消息
ServerBroker->>ClientB: 发送消息

Qos=1 –> 至少一次

sequenceDiagram

ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应PUBACK
ServerBroker->>ClientB: 2.发送消息
ClientB->>ServerBroker: 2.1发送消息回应PUBACK
ServerBroker->>ServerBroker: 2.2删除消息

Qos=2 –> 有且仅有一次

sequenceDiagram

ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应Rec
ClientA->>ServerBroker: 2.发送消息Rel
ServerBroker->>ServerBroker: 2.1删除消息
ServerBroker->>ServerBroker: 2.2存储消息到发送列队
ServerBroker->>ClientB: 2.3发送消息
ServerBroker->>ClientA: 2.4发送消息回应Comp
ClientB->>ServerBroker: 3.发送消息回应Rec
ServerBroker->>ServerBroker: 3.1删除2.2中存储的消息(一次确认)
ServerBroker->>ServerBroker: 3.2存储消息
ServerBroker->>ClientB: 3.3发送消息Rel
ClientB->>ServerBroker: 3.4发送消息回应Comp
ServerBroker->>ServerBroker: 3.5删除消息(二次确认)

1573118624880

实验

实验1:Mosquitto 内存数据落盘机制

实验目的

​ Mosquitto 内存数据落盘持久化时,是否会产生重复数据

实验结论

​ Mosquitto 不会重复持久化已经持久化过的数据

实验环境

​ mosquitto version 1.4.15

实验方法
  1. 发送者保持同样的发送速率,降低订阅者消费速度,触发mqtt的持久化机制
  2. 更改mqtt持久化的间隔(3s 和 30s),观测持久化的数据大小是否不同
实验过程

发送者:发送1w条消息,用时 31.16837883 s

订阅者:每分钟消化1条,1min内堆积9999条消息

Mosquitto:持久化间隔 3s,1min内涨幅 27842 Byte

1573112430542

发送者:发送1w条消息,

订阅者:每分钟消化1条,1min内堆积9999条消息

Mosquitto:持久化间隔 30s,1min内涨幅 27842 Byte

1573113441213

实验脚本
send.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
Send msg to mqtt (linux, python2)
'''
__author__ = 'RYB'

import json
import time
import random
import string
import paho.mqtt.publish as publish


def ranstr(num):
salt = ''.join(random.sample(string.ascii_letters + string.digits, num))
return salt


def publish_msg(msg_num):
for i in range(msg_num):
msg = {
"id": i,
"Timestamp": int(time.time() * 1000),
"str1": ranstr(50),
"str2": ranstr(50)
}

publish.single("test", payload=json.dumps(msg), qos=2, hostname="10.244.0.164", port=1883)


print("===== Start =====")
start = time.time()
print("Running...")
publish_msg(10000)
end = time.time()
print("====== End ======")
print("Run Time: %s s" % str(end - start))
rec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
Receive msg from mqtt (linux, python2)
'''
__author__ = 'RYB'

import time
import paho.mqtt.subscribe as subscribe


def on_message_sleep(client, userdata, msg):
print(msg.topic + ' --> ' + msg.payload)
time.sleep(60)


subscribe.callback(on_message_sleep, "test", qos=2, hostname="mosquitto", port=1883)

参考文档