Механизм Netty из Обнаружения сердцебиения Механизм поддержания активности сети,Он периодически отправляет и получает определенные сообщения (пакеты пульса), чтобы гарантировать, что изоединение остается действительным между клиентом и сервером. Этот механизм очень важен для приложений, которые необходимо поддерживать в течение длительного времени (например, связь в реальном времени, мониторинг, push-сервисы и т. д.).,Потому что это может помочь определить, отключено ли соединение из-за проблем с сетью или сбоя клиента.
Netty поставлять Понятно Обнаружение механизм сердцебиения, позволяющий определить, активно ли соединение. существовать TCP Если во время соединения соединение будет разорвано, сервер и клиент не сразу узнают, что оно было отключено. Таким образом, отправив контрольное сообщение и ожидая ответа от другой стороны, вы можете определить, активно ли соединение.
Netty Предоставляет два способа достижения Обнаружения. сердцебиения:
Конструктор IdleStateHandler выглядит следующим образом:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
readerIdleTime
:читатьизсвободное время,По истечении этого времени будет отправлена посылка «Обнаружение сердца».,Обнаружениеданетсоединять。writerIdleTime
:Писатьизсвободное время,По истечении этого времени будет отправлена посылка «Обнаружение сердца».,Обнаружениеданетсоединять。allIdleTime
:читать Писатьизсвободное время,По истечении этого времени будет отправлена посылка «Обнаружение сердца».,Обнаружениеданетсоединять。unit
:свободное времяединица,По умолчанию — секунды (TimeUnit.SECONDS).При выполнении одного из вышеперечисленных условий оно автоматически запустится. IdleStateEvent будет передан следующему в конвейере. handler из userсуществуют Вставьте сюда фрагмент кода
EventTriggered событие для обработки.
На стороне сервера вы можете добавить IdleStateHandler
Обнаружение процессор сердцебиения и добавить пользовательскую обработку handler
реализация класса userEventTriggered()
Методы логически обрабатываются как таймауты.
Например, вы можете установить каждый 5 Проверка чтения выполняется каждую секунду. 5 в течение нескольких секунд ChannelRead() Если метод не вызывается, он будет запущен один раз.
userEventTrigger()
метод.
На стороне клиента после запуска клиента сначала отправьте сообщение «привет», а затем дождитесь сообщения «ping» от сервера. После получения сигнала сердцебиения ответьте на ответ «ok». Сообщения Heartbeat можно определить по мере необходимости.
проходить Netty из Обнаружение сердцебиениямеханизм,Может эффективно поддерживать длительное соединение,Гарантированная эффективность подключения,Избегайте пустой траты ресурсов сервера.
package com.artisan.heartbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* @author маленький мастер
* @version 1.0
* @mark: show me the code , change the world
*/
public class HeartBeatArtisanServer {
public static void main(String[] args) throws Exception {
// Создайте группу цикла событий «главный-подчиненный».
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
// Создать класс запуска сервера
ServerBootstrap bootstrap = new ServerBootstrap();
// Настройка группы цикла событий
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class) // использоватьNioServerSocketChannel в качестве канала
.childHandler(new ChannelInitializer<SocketChannel>() { // Инициализировать подканал
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); // возьми трубу
pipeline.addLast("decoder", new StringDecoder()); // Добавить декодер
pipeline.addLast("encoder", new StringEncoder()); // Добавить кодировщик
// Добавьте процессор состояния простоя и установите время простоя чтения на 3 секунды.
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartBeatArtisanServerHandler()); // Добавить собственный процессор
}
});
System.out.println("netty server start。。"); // Распечатать информацию о запуске
// Привяжите порт и синхронно ждите успеха
ChannelFuture future = bootstrap.bind(9000).sync();
// Подождите, пока сокет сервера закроется
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace(); // Распечатать информацию об исключении
} finally {
// Освободите ресурсы и корректно закройте группу цикла событий.
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
Простой из Netty Пример сервера, который использует NioServerSocketChannel
в качестве канала сервера и настройте IdleStateHandler
для обработки состояния простоя. Если клиент 3 в течение нескольких секунд Сообщение не было отправлено, сервер инициирует IdleStateEvent
событие и передается следующему процессору в конвейере, т.е. HeartBeatArtisanServerHandler
。Этот специальный процессор должен быть реализован. userEventTriggered
Метод обработки этого события, например отправка контрольного пакета для поддержания активного соединения.
существовать main
метод, мы создаем ServerBootstrap
Экземпляр, настроенный с использованием группы цикла событий, типа канала, обработчика состояния простоя и настраиваемого обработчика. Затем мы связали порт и подождали, пока сервер запустится и выключится.
Уведомление:существуют в практическом применении,HeartBeatArtisanServerHandler
Класс должен реализовать userEventTriggered
метод борьбы с IdleStateEvent
,пожалуйстапродолжатьсмотреть
Этот код представляет собой NettyChannelHandler, используемый для обработки пакетов Heartbeat на стороне сервера.
package com.artisan.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @author маленький мастер
* @version 1.0
* @mark: show me the code , change the world
*/
public class HeartBeatArtisanServerHandler extends SimpleChannelInboundHandler<String> {
// Определите счетчик для записи количества неактивных операций чтения.
int readIdleTimes = 0;
// Этот метод будет вызываться при чтении сообщения из канала.
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
// Если из сообщения да"Heartbeat получено Пакет", затем ответьте "ок"
if ("Heartbeat Packet".equals(s)) {
ctx.channel().writeAndFlush("ok");
} else {
// Если да другая информация, выведите другую информацию обработки. ...
System.out.println(" Другая обработка информации ... ");
}
}
// Этот метод будет вызываться, когда Netty инициирует тайм-аут.
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// Определяет строку, используемую для хранения типа события тайм-аута.
String eventType = null;
// Переключение в зависимости от статуса события
switch (event.state()) {
case READER_IDLE:
eventType = «Читать вхолостую»;
// Чтение простоя с увеличением счетчика на 1
readIdleTimes++;
break;
case WRITER_IDLE:
eventType = «Пиши без дела»;
// Не обработано
break;
case ALL_IDLE:
eventType = «Чтение и запись в режиме ожидания»;
// Не обработано
break;
}
// Распечатайте информацию о тайм-ауте
System.out.println(ctx.channel().remoteAddress() + "тайм-аутсобытие:" + eventType);
// Если количество простоев чтения превышает 3 раза, закройте соединение, чтобы освободить больше ресурсов.
if (readIdleTimes > 3) {
System.out.println(" [server] Прочитайте в режиме ожидания более 3 раз, закройте соединение, чтобы освободить больше ресурсов");
ctx.channel().writeAndFlush("idle close");
ctx.channel().close();
}
}
// Этот метод будет вызываться при активации канала, т.е. при успешном установлении соединения.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}
Этот код определяет NettyизChannelHandler.,для работы с сетьюсоединятьсерединаиз Сумка Heartbeat。Этот процессор наследует отSimpleChannelInboundHandler<String>
,Это означает, что он в основном используется для обработки сообщений строкового типа.
channelRead0
метод:Этот метод будет вызываться при чтении сообщения из канала.。существоватьздесь,Проверяет, было ли получено сообщение «Heartbeat Packet».,Если да,тогда ответь "ок",нетзатем распечатайте Другая обработка информации。userEventTriggered
метод:Долженметоддля обработкиNettyизтайм-аутсобытие。NettyКаналы будут регулярно проверятьсяданетпраздныйсостояние,Здесь изIdle означает, что операции чтения или записи не выполняются. Если произошло событие тайм-аута,Netty запустит этот метод.существовать в этом методе,Он подсчитывает количество ожидающих операций чтения.,если более 3 раз,затем отправьте «холостой ход закрыть» и закрыть соединение.channelActive
метод:Этот метод будет вызываться при активации канала, т.е. при успешном установлении соединения.。существоватьздесь,Он распечатывает удаленный адрес.Суммируя: Этот процессор в основном обрабатывает три типа таймаутов: простое чтения, простое записи и простое чтения и записи. Когда получен пакет Heartbeat,Отвечу "ок",Если количество чтений в режиме простоя превышает 3 раза,Совместительство будет закрыто.
【Client】
этот кодда Простой пример клиента изNetty,Используется для отправки пакетов пульса на сервер. Ниже приведены подробные китайские примечания:
package com.artisan.heartbeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
/**
* @author маленький мастер
* @version 1.0
* @mark: show me the code , change the world
*/
public class HeartBeatArtisanClient {
public static void main(String[] args) throws Exception {
// Создайте NioEventLoopGroup для обработки циклов Nettyизсобытие.
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// Создайте объект Bootstrap для настройки клиента.
Bootstrap bootstrap = new Bootstrap();
// Установите типы EventLoopGroup и Channel.
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
// Добавьте ChannelInitializer для инициализации ChannelPipeline.
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// получатьChannelPipelineи Добавить декодер, кодер и обработчик обработки пульса
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatArtisanClientHandler());
}
});
// Распечатайте информацию о запуске клиента Netty.
System.out.println("netty client start。。");
// подключиться к серверу и получить объект Channel
Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
// Определите текст отправляемого пакета Heartbeat.
String text = "Heartbeat Packet";
// Создайте объект Random для генерации случайных чисел.
Random random = new Random();
// существующий случайный отправляет пакеты пульса в цикле
while (channel.isActive()) {
int num = random.nextInt(8);
Thread.sleep(num * 1000);
channel.writeAndFlush(text);
}
} catch (Exception e) {
// Распечатать информацию об исключении
e.printStackTrace();
} finally {
// Грамотно закройте EventLoopGroup и освободите ресурсы.
eventLoopGroup.shutdownGracefully();
}
}
}
HeartBeatArtisanClientHandler
,он наследует отSimpleChannelInboundHandler<String>
。Этот процессор используется для обработкиизстроковое сообщение。
package com.artisan.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author маленький мастер
* @version 1.0
* @mark: show me the code , change the world
*/
public class HeartBeatArtisanClientHandler extends SimpleChannelInboundHandler<String> {
// Переопределить метод ChannelRead0 для обработки получения сообщений.
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// Распечатать полученное сообщение
System.out.println(" client received :" + msg);
// Если получено сообщение да"idle закрыть", затем закрыть соединение
if (msg != null && msg.equals("idle close")) {
// Распечатайте информацию об отключении сервера
System.out.println(" Сервер закрыт соединять и клиент тоже закрыт");
// Закрыть клиентсоединять
ctx.channel().closeFuture();
}
}
}
существуют в этом процессоре,когда сообщение получено,channelRead0
методбудет вызван。если полученоизда"idle close"
информация,Процессор напечатает сообщение о том, что сервер закрыт.,И немедленно закрыть клиент изоединять. В этом случае,Клиент не будет отправлять пакеты Heartbeat,Потому что сервер больше не принимает соединение.
Пожалуйста, посмотрите В клиенте HeartBeatArtisanClient
Посмотрите еще раз HeartBeatArtisanServer
конецизнастраивать
Разработано, как указано выше, случайный 0–7.
Запускаем Сервер и Клиент последовательно, чтобы наблюдать
Попробуйте еще раз
Сообщения передаются одно за другим.
Давайте сначаласмотреть ВнизIdleStateHandler
серединаизchannelRead
метод
Только да выполнен сквозной переход,Никакая обработка бизнес-логики не выполняется.,Пусть следующий обработчик в ChannelPipe обрабатывает метод ChannelRead.
initialize
изметод,этотдаIdleStateHandler
из Основной исходный код
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
существуют В этом методе,первыйпроходитьswitch
оператор проверяет текущийизсостояние。еслисостояниеужеда1или2,Тогда инициализация не требуется. Статус 1 обычно указывает на то, что инициализация завершена.,Статус 2 означает, что канал закрыт.
Затем установите статус на 1,ивызовinitOutputChanged
метод борьбы Изменение вывода, обычно это делается для того, чтобы размер выходного буфера канала соответствовал размеру, установленному при инициализации.
Затем установите время последнего чтения и записи на текущее время, и эти временные метки будут использоваться для последующего обнаружения состояния простоя.
Затем,в соответствии снастраиватьизчитатьпраздныйтайм-аутвремя(readerIdleTimeNanos
)、Писатьпраздныйтайм-аутвремя(writerIdleTimeNanos
)和所有праздныйтайм-аутвремя(allIdleTimeNanos
),Запланируйте соответствующие задачи тайм-аута отдельно. Эти задачи будут выполнены по истечении указанного периода ожидания.,Для обработки состояния простоя канала.
ReaderIdleTimeoutTask
:Если каналсуществоватьreaderIdleTimeNanos
Не включеночитать Получить операцию,Эта задача будет запущена.WriterIdleTimeoutTask
:Если каналсуществоватьwriterIdleTimeNanos
Не включено Писать Введите операцию,Эта задача будет запущена.AllIdleTimeoutTask
:Если каналсуществоватьallIdleTimeNanos
Нет ни того, ни другогочитать Получить операцию也没有Писать Введите операцию,Эта задача будет запущена.Обычно мы используем readIdleTimeout, поэтому нам нужно сосредоточиться на
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
продолжать ReaderIdleTimeoutTask
прямойсмотретьrun
метод
@Override
protected void run(ChannelHandlerContext ctx) {
// Определите время задержки перед переходом в состояние ожидания в следующий раз.
long nextDelay = readerIdleTimeNanos;
// Если текущий статус не читается
if (!reading) {
// Вычтите количество наносекунд с момента последнего чтения, чтобы настроить следующую задержку.
nextDelay -= ticksInNanos() - lastReadTime;
}
// Если время задержки меньше или равно 0, это означает, что чтение бездействовало достаточно долгое время.
if (nextDelay <= 0) {
// Установите новый тайм-аут и отправьте уведомление об обратном вызове
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
// да Нода Первый триггер читается в режиме ожиданиясобытие
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false; // Установите флаг в значение false, чтобы избежать повторного срабатывания последующих
try {
// Создать событие состояния простоя
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// существуют, вызывают это событие в контексте канала
channelIdle(ctx, event);
} catch (Throwable t) {
// Если возникает исключение, распространите это исключение на контекст канала.
ctx.fireExceptionCaught(t);
}
} else {
// Если операция чтения происходит до истечения времени существования, запланируйте более короткую задержку по времени ожидания.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
run
методпервыйопределение ПонятноодинnextDelay
переменная,этотиндивидуальныйпеременная表示Вниз一次курокпраздныйсостояниесобытиевпередиз Время задержки。этотиндивидуальный Время задержки Зависит отreaderIdleTimeNanos
Решать,этодасуществоватьIdleStateHandler
Конструкторсерединанастраиватьиз。reading
Флагfalse
,Означает, что канал в данный момент не находится в состоянии чтения.,Тогда это начнется сnextDelay
середина Вычтите количество наносекунд с момента последнего чтения, чтобы настроить следующую задержку.。nextDelay
меньше, чемилиравный0,Это означает, что читатель простаивал достаточно долго.,нуждаться Установите новый тайм-аут и отправьте уведомление об обратном вызове。здесьиспользовать Понятноschedule
методсуществоватьctx
通道上Вниз文середина安排одинIdleStateHandler
изосуществлять,осуществлятьвремядляreaderIdleTimeNanos
。firstReaderIdleEvent
логотип для идентификацииданетдапервыйкурокчитать者праздныйсобытие。Если дапервый,этотиндивидуальный标志会被настраиватьдляfalse
,Во избежание последующего повторного срабатывания.IdleStateEvent
событие,ииспользоватьchannelIdle
методсуществуют, вызывают это событие в контексте канала。этотиндивидуальныйсобытиебудет переданосуществоватьChannelPipeline
серединазарегистрироватьсяизIdleStateHandler
изперезвонить。ctx.fireExceptionCaught(t)
методсуществовать通道上Вниз文серединараспространениеэтотиндивидуальный异常。nextDelay
больше, чем0,Описание существует Операция чтения произошла до истечения таймаута,Поэтому будет запланирована более короткая задержка и тайм-аут.Давайте возьмем пример, чтобы понять
nextDelay -= ticksInNanos() - lastReadTime;
Вычтите время последнего вызова метода ChannelRead из текущего времени. Если результат равен да6с. , указывая, что последний раз, когда ChannelRead вызывался, был 6 секунд. Извпередизиметь значение Понятно,Вы устанавливаете изда5s,Тогда nextDelay равен -1,Время ожидания описания истекло
Это вызовет channelIdle(ctx, event);
Исходный код выглядит следующим образом
/**
* Is called when an {@link IdleStateEvent} should be fired. This implementation calls
* {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
*/
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
Если тайм-аут отсутствует, метод userEventTriggered не запускается.
Если таймаута нет, иди
// Если операция чтения происходит до истечения времени существования, запланируйте более короткую задержку по времени ожидания.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
этотиндивидуальныйrun
методдаNetty处理通道праздныйсостояниеизключевая часть,Это гарантирует, что соответствующая логика обработки может быть запущена, когда существующий канал не читается в течение длительного времени.,Тем самым избегая растраты ресурсов и потенциальных проблем.。