kotlin之MQTT协议实现

发布时间:2023-01-23 15:30

一、项目说明

本次项目对接使用mqtt协议的单灯控制器,多用于城市路灯控制。对于mqtt,我的理解是类似于微信的公众号,由发布者发送文章到服务器,然后下发给订阅了公众号的用户,然后用户就收到了推送的文章。根据mqtt协议的特点,分别创建订阅端(subscribe)、发布端(publish),并让两端都连上mqtt服务器。搭建mqtt服务器参考:emqx。

二、添加依赖

mqtt依赖项:搜索 “eclipse mqttv” 即可找到相关依赖,第二个依赖用于后面数据的同步处理

dependencies {
	implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2")
}

三、具体实现

1.父类(BaseMqtt)

父类包含发布端与订阅端的公共属性,HOST 为mqtt服务器地址,用户名及密码对应填写,其他保持不变即可。

open class BaseMqtt{
    companion object {
        // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示
        const val HOST = "tcp://XXX.XXX.XXX.XXX:1883"
        var client: MqttClient? = null
        const val userName = "XXXX"
        const val passWord = "XXXXXXXXX"
        const val clientId = "kotlin001"
    }
}

2.发布端与订阅端整合为服务端(ServerMqtt)

此处imei,可有可无,PUB_TOPIC 为发布主题,SUB_TOPIC 为订阅主题,发布消息后,订阅端与之主题对应便可以接收到消息。sendInstructions为对外接口,msg为发布的信息,通过publish方法发布到服务器。subscribe方法为订阅端主方法,只需在建立连接以后调用此方法即可。awaitResult方法为阻塞方法,在发布端发布信息以后等待订阅端接收信息以后才会继续往后执行,否则则堵塞方法向下执行。

/**
 * Title:Server
 * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
 */
class ServerMqtt(_imei:String):BaseMqtt() {
    companion object {
        var ServerMqttContexts:MutableMap<String, ServerMqtt> = mutableMapOf()
        fun create(imei: String):ServerMqtt{
            if(ServerMqttContexts.containsKey(imei)){
                return ServerMqttContexts[imei]!!
            }else{
                val mqtt = ServerMqtt(imei)
                ServerMqttContexts[imei] = mqtt
                return mqtt
            }
        }
    }
    var imei:String = _imei
    //定义一个主题
    private val PUB_TOPIC = "xxxxxx/xxxx/$imei"
    private val SUB_TOPIC = "xxxxxx/xxxx/$imei"
    private var pubTopic: MqttTopic? = null
    private var message = MqttMessage()
    private val pushCallback = ServerPushCallback(this)
    /**
     * 构造函数
     */
    init {
        connect()
    }

    /**
     * 用来连接服务器
     */
    private fun connect() {
        val options = MqttConnectOptions()
        options.isCleanSession = false
        options.userName = userName
        options.password = passWord.toCharArray()
        // 设置超时时间
        options.connectionTimeout = 10
        // 设置会话心跳时间
        options.keepAliveInterval = 20
        try {
            client = MqttClient(HOST, clientId, MemoryPersistence())
            client!!.setCallback(pushCallback)
            client!!.connect()
            pubTopic = client!!.getTopic(PUB_TOPIC)
            subscribe()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }
    fun disconnect() {
        try {
            client?.disconnect(0)
            ServerMqttContexts.remove(imei)
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    /**
     * 订阅消息
     */
    private fun subscribe() {
        try {
            client!!.subscribe(arrayOf(SUB_TOPIC), intArrayOf(1))
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }
    /**
     *  发布消息
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    @Throws(MqttPersistenceException::class, MqttException::class)
    fun publish(topic: MqttTopic?, message: MqttMessage?) {
        val token = topic!!.publish(message)
        token.waitForCompletion()
        println(
            "message is published completely! "
                    + token.isComplete
        )
    }

    private fun sendInstructions(msg:String): Boolean {
        try {
            message.qos = 1
            message.isRetained = true
            message.payload = hexStrToByteArray(msg)
            publish(pubTopic, message)
            return message.isRetained
        }catch (e:Exception){
            println(e)
        }
        return false
    }

    /**
     * 开灯控制
     * @param imei 灯控标识
     * @param state 开关指令 1 开启|0 关闭
     */
    fun mqttLampController(state: Int): Boolean {
        val onInstructions:String = if (state == 1){
            "XXXXXXXXXXXXXXXXXXXX"
        }else{
            "XXXXXXXXXXXXXXXXXXXX"
        }
        return sendInstructions(onInstructions)
    }

    /**
     * 获取灯控当前状态
     * @param imei 设备标识
     */
    fun getMqttLampCurrentState(): Boolean {
        val readStateInstructions = "XXXXXXXXXXXXXXXXXXXX"
        return sendInstructions(readStateInstructions)
    }

    /**
     * 重启设备
     * @param imei 设备标识
     */
    fun restartMqttLamp(): Boolean {
        val restartInstructions = "XXXXXXXXXXXXXXXXXXXX"
        return sendInstructions(restartInstructions)
    }

    /**
     * 设置灯控本地时间控制策略
     * @param openTime 预设开启时间
     * @param closeTime 预设关闭时间
     *
     */
    fun setMqttLampLocalTimeStrategy(openTime: LocalDateTime, closeTime: LocalDateTime): Boolean {
        var timeInstructions =
           "XXXXXXXXXXXXXXXXXXXX"
        return sendInstructions(timeInstructions)
    }

    /**
     * 获取本地控制时间
     */
    fun getMqttLampLocalTimeStrategy(): Boolean {
        val getTimeInstructions = "XXXXXXXXXXXXXXXXXXXX"
        return sendInstructions(getTimeInstructions)
    }

    /**
     * 校时
     */
    fun setTiming(): Boolean {
        var setTimingInstructions = "XXXXXXXXXXXXXXXXXXXX"
        return sendInstructions(setTimingInstructions)
    }

    /**
     * 获取灯控订阅返回信息
     */
    private suspend fun getMqttLampSubscribeData(msg: ByteArray): Boolean {
        val len = msg.size
        if (len > 7){
            val powerCode = ByteUtils.bytes2HexString(ByteUtils.slice(msg, 10, 13))
            val sync = ByteUtils.bytes2HexString(ByteUtils.slice(msg, 0, 8))
            val length = msg[9].toInt()

            if (sync != "xx xx xx xx xx xx xx"){
                println("信息头有误!")
                return false
            }

            if (length != len - 12){
                println("数据长度有误!")
                return false
            }
            if (len == 62){
                //获取状态信息
                if (powerCode == PowerCode.STATUS){
                    /**
                     * 消息处理代码
                     **/
                }
            }else if (len == 19){
                //开关灯返回信息
                if (powerCode == PowerCode.ONOROFFLAMP){
                    /**
                     * 消息处理代码
                     **/
                }
            }else if (len == 17){
                //时间策略返回信息
                if (powerCode == PowerCode.TIMESTRATEGY){
                    /**
                     * 消息处理代码
                     **/
                }
            }
        }
        return true
    }
    private var onInstructionsResultCallback: ((Boolean) -> Unit?)? = null
    suspend fun awaitResult(timeout:Long=30000):Boolean = suspendCancellableCoroutine {ctn->
        onInstructionsResultCallback = {
            ctn.resume(it)
        }
    }
    /*--------------------------------------------------------------*/
    fun onMessage(message: MqttMessage){
        GlobalScope.launch {
            val isSuccess = getMqttLampSubscribeData(message.payload)
            onInstructionsResultCallback?.invoke(true)
            onInstructionsResultCallback = null
        }
    }
}

3.消息回调(ServerPushCallback)

订阅端执行后在这里获取到消息,然后对得到的消息可以进行对应的处理,若连接断开,也将在这里进行重连。

class ServerPushCallback(_mqtt:ServerMqtt): MqttCallback {
    private var onMessageCallback: ((String) -> Unit?)? = null
    private var mqtt:ServerMqtt
    override fun connectionLost(cause: Throwable?) {
        // 连接丢失后,一般在这里面进行重连
        println("连接断开,可以做重连")
        mqtt.onDisconnect()
    }
    init{
        mqtt = _mqtt
    }
    override fun deliveryComplete(token: IMqttDeliveryToken) {
        println("deliveryComplete---------" + token.isComplete)
    }

    /**
     * 回调方法
     */
    fun onMessage(calback: (msg: String) -> Unit) {
        onMessageCallback = calback
    }

    @OptIn(DelicateCoroutinesApi::class)
    @Throws(Exception::class)
    override fun messageArrived(topic: String, message: MqttMessage) {
        if (message.payload.size > 7){
            // subscribe后得到的消息会执行到这里面
            val msg = ByteUtils.bytes2HexString(message.payload)
            val imei = topic.substringAfter("xxxxxx/xxxx/")
            println("接收消息主题 : $topic")
            println("接收消息Qos : " + message.qos)
            println("接收消息内容 : $msg")
            mqtt.onMessage(message)
        }
    }
}

4.主函数入口

fun main(){
	//单灯控制器imei
    val imei = "XXXXXXXXXXXXXX"
    val mqtt= ServerMqtt.create(imei)
    //发布开灯指令
    mqtt.mqttLampController(1)
}

四、Byte工具类

		fun bytes2HexString(bytes: ByteArray): String {
        val ret = StringBuilder()
        for (b in bytes) {
            var hex = Integer.toHexString(b.toInt() and 0xFF)
            if (hex.length == 1) {
                hex = "0$hex"
            }
            ret.append(hex.uppercase(Locale.getDefault())).append(' ')
        }
        return ret.toString().replaceFirst("\\s$".toRegex(), "")
    }
		/**
     * byte to ascii
     */
    fun bytes2Ascii(bytes: ByteArray):String{
        val ret = StringBuilder()
        for (b in bytes) {
            val hex = Integer.toHexString(b.toInt() and 0xFF)
            val decimal  = Integer.parseInt(hex, 16)
            ret.append(decimal.toChar())
        }
        return ret.toString()
    }

    /**
     * hexString to byteArray
     */
    fun hexStrToByteArray(str: String?): ByteArray? {
        if (str == null) {
            return null
        }
        if (str.isEmpty()) {
            return ByteArray(0)
        }
        val hexStr = str.replace("\\s*".toRegex(),"")
        val byteArray = ByteArray(hexStr.length / 2)
        for (i in byteArray.indices) {
            val subStr = hexStr.substring(2 * i, 2 * i + 2)
            byteArray[i] = subStr.toInt(16).toByte()
        }
        return byteArray
    }

    /**
     * String to byteArray
     */
    fun strToBytes(str:String): ByteArray {
        var bytesStr = ByteArray(str.length/2)
        for (i in bytesStr.indices) {
            val subStr = str.substring(2 * i, 2 * i + 2)
            bytesStr[i] = Integer.parseInt(subStr).toByte()
        }
        return bytesStr
    }

五、累加和校验

此方法为累加和校验取校验码

object CheckUtils {
    fun calcCheck(data: ByteArray): Int {
        var dSum = 0
        val length: Int = data.size
        // 遍历十六进制,并计算总和
        for (index in 0 until length) {
            val s = ByteUtils.byte2HexString(data[index])
            dSum += s.toInt(16) // 十六进制转成十进制 , 并计算十进制的总和
        }
        return getHexInt(dSum)
    }

    /**
     * 累加和取反
     */
    private fun getHexInt(dSum:Int): Int {
        var mod: Int
        if (dSum > 255){
            mod = dSum % 256
        }else{
            mod = dSum
        }
        var hexInt= Integer.toHexString(mod)
        if (hexInt.length < 2) {
            hexInt = "0$hexInt";  // 校验位不足两位的,在前面补0
        }else if(hexInt.length > 2){
            hexInt = hexInt.substring(hexInt.length-2,hexInt.length)
        }
        return hexInt.toInt(16)
    }
}

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号