ChannelGroup.java 1.9 KB
package com.viontech.netty;


import com.viontech.model.BaseModel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
 * @author 谢明辉
 */
@Slf4j
public class ChannelGroup {

    /** 设备序列号和最后心跳的映射 */
    public static final Map<String, Long> HEART_BEAT_MAP = new LinkedHashMap<>();
    private static final Lock LOCK = new ReentrantLock();
    /** 设备序列号和通道id的映射 */
    private static final Map<Long, Channel> DEVICE_CHANNEL_MAP = new LinkedHashMap<>();

    /**
     * 设备注册
     *
     * @param deviceId 设备ID
     * @param channel  通道
     */
    public static void registered(Long deviceId, Channel channel) {
        LOCK.lock();
        try {
            DEVICE_CHANNEL_MAP.put(deviceId, channel);
        } finally {
            LOCK.unlock();
        }
        log.info("设备注册成功:{}", deviceId);

    }

    /**
     * @param deviceId 设备ID
     * @param msg      要发送的消息
     */
    public static void sendMessage(Long deviceId, Object msg) {
        Channel channel = DEVICE_CHANNEL_MAP.get(deviceId);
        try {
            channel.writeAndFlush(msg).await();
        } catch (Exception ignored) {
        }
    }

    /**
     * 广播消息
     *
     * @param msg 消息
     */
    public static void broadcast(BaseModel msg) {
        for (Channel channel : DEVICE_CHANNEL_MAP.values()) {
            try {
                channel.writeAndFlush(msg).await();
            } catch (Exception ignored) {
            }
        }
    }

    public synchronized static void unRegistered(ChannelHandlerContext ctx) {
        DEVICE_CHANNEL_MAP.entrySet().removeIf(next -> next.getValue().compareTo(ctx.channel()) == 0);
    }
}