文章原创如需转载,请注明出处”本文首发于陕西毅杰”;
引言
之前我整理过关于RabbitMQ在iOS中的应用;这个RabbitMQ,是MQTT的一个消息中间件;除了这个中间件,还有其他支持MQTT的中间件,我们服务端在新的物联网项目上,采用了MQTT协议,就使用上而言,我个人觉得,MQTT使用起来更简单,更方便,虽然碰到了比较Low的问题,下面会说到;
为什么有这个文章–整理的过程就是技术沉淀的过程
- 第一次接触MQTT的时候,是看到这个文章的,排版不怎么舒服;
- 很久之后,看到自己写的代码,又忘了,又得查找。整理的过程就是技术沉淀的过程
- 为了其他开发者,能够方便又舒服的看到,至于能不能搜到,就看你能不能突破”结界“了;
MQTT介绍
MQTT简介
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议–来源百度百科
MQTT,主要提供了==订阅/发布==两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。
MQTT特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,具有以下特点:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
- 对负载内容屏蔽的消息传输;
- 使用 TCP/IP 提供网络连接;
-
有三种消息发布服务质量(Qos):
-
“至多一次”(QoS==0),消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
-
“至少一次”(QoS==1),确保消息到达,但消息重复可能会发生。
-
“只有一次”(QoS==2),确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
-
支持语言
Java ,Javascript, C/C++, Python, Ruby, Objective-C
MQTT使用
使用流程
客户端登录业务系统,然后启动MQTT连接,订阅各种主题消息,如果连接失败,需要在回调里处理,重连,MQTT有一个连接的状态 MQTTSessionStatus
,回调可以监听到;
订阅成功后,如果有新的消息推过来,这时候,就需要解析,一般需要提前跟后台约定好,数据的格式,方便客户端这边解析;
大概的流程如下:
MQTT 安装
pod ‘MQTTClient’
开始使用
导入头文件 #import<MQTTClient/MQTTClient>
为了方便调用MQTT服务,我们自定义一个类,暂且就叫做
MQTTManger
:NSObject
主要用来管理MQ的开始连接,订阅主题,取消订阅,关闭连接等;
因为,如果你在需要的页面上订阅数据,会写很多session的创建,主题的订阅,不方便维护,管理;
以下是MQTTManger的接口部分 .h 文件
/** 订阅主题,设置某一个订阅主题的VC为代理 */
- (void)startSubcribeMessageWithDeviceTheme:(NSString *)deviceTheme delegate:(id<MQTTSessionDelegate>)delegate;
/** 取消订阅 */
- (void)unSubscribeMessageWithDeviceTheme:(NSString *)deviceTheme delegate:(id<MQTTSessionDelegate>)delegate;
/** 开始连接 */
- (void)startConnectServer;
/** 关闭连接 */
- (void)disConnectServer;
开始连接
- 设置服务地址,端口号
- 创建session,设置用户名,密码,心跳设置,clientID,消息服务质量(Qos)等;
- 开始连接
- 监听MQTT连接状态
- (void)startConnectServer {
//设置服务地址,端口号
MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
transport.host = @"192.168.*.**";
transport.port = 1883;
//创建session,设置用户名,密码,clientID,消息服务质量(Qos)等;
MQTTSession *session = [[MQTTSession alloc] init];
session.clientId = @"iOS_3456345745686";
session.transport = transport;
session.delegate = self;
session.userName = @"liluxin";
session.password = @"123345";
session.cleanSessionFlag = YES;
session.willQoS = MQTTQosLevelAtMostOnce;
self.session = session;
[session connectAndWaitTimeout:1];
[session addObserver:self forKeyPath:@"status" options:(NSKeyValueObservingOptionOld) context:nil];
//开始连接
[session connectWithConnectHandler:^(NSError *error) {
DebugLog(@"错误的信息:%@",error);
if (error) return;
DebugLog(@"MQTT链接成功了");
}];
}
实现代理方法,监听MQTT的状态
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary<NSKeyValueChangeKey,id> *)change context:(void *)context {
DebugLog(@"MQTT的状态:%@_____%@",self.session,change);
if (self.session.status == MQTTSessionStatusDisconnecting||MQTTSessionEventConnectionError) {
//重连即可
[self.session connect];
}
}
关闭连接
当用户退出登录的时候,需要将MQTT关闭,即就是将delegate置为nil,然后断开连接,最后设置会话为nil;
- (void)disConnectServer {
self.session.delegate = nil;
[self.session disconnect];
self.session = nil;
}
订阅主题
消息主题的格式,一般服务端定义好:类似这样的
/notification/public/device/LUCY3002/DQ00000001
注意:消息订阅必须在主线程中,因为如果放在子线程,消息的回调收不到消息;
这个文章说到了,可以参考下:
- (void)startSubcribeMessageWithDeviceTheme:(NSString *)deviceTheme delegate:(id<MQTTSessionDelegate>)delegate {
//这里会把订阅的VC设置为代理,这个定于的VC实现代理方法就可以收到推送的消息了
self.session.delegate = delegate;
if (self.session.status == MQTTSessionStatusConnected) {
//订阅
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_async(dispatch_get_main_queue(), ^{
[self.session subscribeToTopic:deviceTheme atLevel:MQTTQosLevelAtMostOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
if (error) {
NSLog(@"订阅失败 %@", error.localizedDescription);
}
else {
NSLog(@"订阅成功 %@", gQoss);
}
}];
});
});
}
}
实现消息的监听代理方法,对收到的消息,进行json–>model,然后更新UI即可;
#pragma mark - MQTTSessionDelegate
- (void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid {
NSError *error;
NSDictionary *dic = [NSJSONSerialization JSONObjectWithData:data
options:NSJSONReadingMutableContainers
error:&error];
DebugLog(@"主题信息:%@ \n,收到的信息:%@",topic,dic);
}
取消订阅主题
当用户退出某一个页面的时候,我们就需要取消订阅,如果不取消订阅,当有新消息来得时候,用户还是可以收到的,没什么用,除非是全局的推送,要不然不取消就是浪费资源;
- (void)unSubscribeMessageWithDeviceTheme:(NSString *)deviceTheme delegate:(id<MQTTSessionDelegate>)delegate {
if (self.session.status == MQTTSessionStatusConnected) {
//取消订阅
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_async(dispatch_get_main_queue(), ^{
[self.session unsubscribeTopic:deviceTheme unsubscribeHandler:^(NSError *error) {
if (error) {
NSLog(@"取消订阅失败 %@", error.localizedDescription);
}
else {
NSLog(@"取消订阅成功");
}
}];
});
});
}
}
MQTTManger,到这里就结束了,基本上,使用的话,就直接调用对应的方法,简单,方便;
自己测试MQTT数据推送
模拟软件
如果想要自己测试推送数据的话,有一个工具模拟软件,可以下发消息,这个东西,没有RabbitMQ的管理台方便
不过这个东西,可以测试MQTT各种不同的情况,比较方便,不需要依赖后台,自己就可以
- 创建连接
Connecetions-> newConnecetions
- 设置参数 下面这个基本是自动生成的,不用管,唯一的就是需要设置,server URL,这个跟服务端同学要一下;
设置用户名密码
然后打开连接,就可以发布模拟消息了
- 发布模拟消息
看图操作,没有你不会的;
Java运行环境
上面提到的模拟软件,需要在 Mac 电脑安装 Java 环境,配置环境变量
MQTT遇到的坑
我遇到的就是,自己明明连接成功了,也订阅成功了,就是收不到自己模拟发送的消息,我就纳闷了
后来发现,是一个很小很小的问题,是自己模拟软件的 服务端地址写成本地的了,因为我之前在本地部署过MQTT的服务,我自己手机连接的地址跟模拟软件的不一样,所以,收不到消息是正常的;
一开始,我以为是消息的订阅在子线程中,反复确定了,就是在主线程订阅的;
本地部署MQTT服务
如果想在本地部署MQTT的服务,可以看这个百万级分布式开源物联网MQTT消息服务器
很简单的,启动好之后,就可以访问控制台地址:
http://127.0.0.1:18083,
默认用户: admin,密码:public
这个管理台,我觉得,没啥用,不能操作,不能模拟发消息,可能对服务端监控有用;