Обычно клиент активно отправляет контрольные сообщения только для поддержания соединения с сервером, в то время как другие сообщения часто требуют, чтобы сервер отправлял сообщения клиенту для получения.
Map
Сохранено, чтобы обеспечить потокобезопасность, вы можете использовать потокобезопасность. ConcurrentHashMap
。ConcurrentHashMap
Найдите целевой канал клиентсоединять. Найдя его, сначала определите, активен ли канал. Если соединение активно, отправьте сообщение клиенту через этот канал. Если оно неактивно, отправьте его из него. Map
Удалите информацию об этом канале.Уже представлено в двух предыдущих статьях netty
Здесь представлен общий код платформы, только некоторые основные ключевые коды, а остальная часть кода не будет подробно описываться.
Направления:
Создать новый ChannelMap
Класс, сохраняется при первом подключении клиента. channel
соединять. Когда сервер впоследствии отправит сообщение клиенту, он сначала Map
Найдите соответствующее соединение канала сообщений клиента, а затем запишите сообщение в канал для отправки.
/**
* @Author крокодил
* @Description синхронизировать канал сохранить карту
* @date 2022/11/27 16:30
* @Version 1.0
*/
public class ChannelMap {
/**
* Сохраните соответствующую связь между идентификатором идентификации клиента (идентификатором сообщения) и каналом.
*/
private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
private ChannelMap() {
}
public static ConcurrentHashMap<String, Channel> getChannelMap() {
if (null == channelMap) {
synchronized (ChannelMap.class) {
if (null == channelMap) {
channelMap = new ConcurrentHashMap<>();
}
}
}
return channelMap;
}
public static Channel getChannel(String id) {
return getChannelMap().get(id);
}
}
Когда клиент устанавливает соединение (сервер получает сообщение Heartbeat), канал добавляется на карту.
public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> {
private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);
/**
* Обрабатывается, когда устройство подключено к сети
*
* @param ctx
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("Появились новые соединения: [{}]", ctx.channel().id().asLongText());
}
/**
* Обработка данных
*
* @param ctx
* @param msg
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
// Получить тело сообщения в экземпляре сообщения
String content = msg.getContent();
// Обработка различных типов сообщений
MessageEnum type = MessageEnum.getStructureEnum(msg);
switch (type) {
case CONNECT:
// Добавить канал в ChannelMap
ChannelMap.getChannelMap().put(msg.getId(), ctx.channel());
// Добавьте clientID в качестве настраиваемого атрибута канала, чтобы упростить получение идентификатора пользователя из канала в любое время.
AttributeKey<String> key = AttributeKey.valueOf("id");
ctx.channel().attr(key).setIfAbsent(msg.getId());
// TODO Обработка сообщений Heartbeat
case STATE:
// TODO Статус устройства
default:
System.out.println(type.content + «Содержимое сообщения» + content);
}
}
/**
* Автономная обработка оборудования
*
* @param ctx
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("Устройство не в сети: {}", ctx.channel().id().asLongText());
// Удалить канал с карты
removeId(ctx);
}
/**
* Устройство поддерживает обработку исключений
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Исключение печати
log.info("Исключение: {}", cause.getMessage());
// Удалить канал с карты
removeId(ctx);
// закрытиесоединять ctx.close();
}
private void removeId(ChannelHandlerContext ctx) {
AttributeKey<String> key = AttributeKey.valueOf("id");
// Получить идентификатор в канале
String id = ctx.channel().attr(key).get();
// карта удалить канал
ChannelMap.getChannelMap().remove(id);
}
}
Напишите класс бизнес-уровня, который отправляет сообщения.,И получить канал канала в карте через clientid,После преобразования сообщения в строку json,проходитьwriteAndFlush
Отправитьклиент。
/**
* @Author крокодил
* @Description Отправить сообщение клиенту
* @date 2022/11/27 17:29
* @Version 1.0
*/
@Service
public class PushMsgServiceImpl implements PushMsgService {
/**
* Отправить сообщение клиенту
*
* @param msg
*/
@Override
public void push(Message msg) {
// клиентID
String id = msg.getId();
Channel channel = ChannelMap.getChannel(id);
if (null == channel) {
throw new RuntimeException("клиент не в сети");
}
channel.writeAndFlush(msg);
}
}
Уведомление:writeAndFlush
Параметр представляет собой экземпляр универсального объекта с пользовательской кодировкой.。Как указано в этой статьеMessage
Класс разбора сообщений。
public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
private static String delimiter;
public MessageEncodeHandler(String delimiter) {
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
out.writeBytes(
(message.toJsonString() + delimiter)
.getBytes(CharsetUtil.UTF_8)
);
}
}
Напиши еще позжеController
добрый(Опустить здесь),существоватьController
добрыйсерединавызовPushMsgService
серединаpushff,Вы можете завершить отправку сообщения клиенту.