В области обработки больших данных Apache Storm — это вычислительная система реального времени, предназначенная для обработки огромных потоков данных. Он предоставляет распределенные, отказоустойчивые и высокодоступные вычислительные решения реального времени, позволяющие разработчикам легко создавать сложные конвейеры обработки данных. В этой статье в простой и понятной форме будут представлены основные концепции, принципы работы, распространенные проблемы и решения Storm, а также использован простой пример кода, чтобы показать, как использовать Storm для обработки данных в реальном времени.
В Storm топология представляет собой логическую структуру вычислительной задачи реального времени. Вы можете думать об этом как о направленном ациклическом графе (DAG), состоящем из Spouts (источников данных) и Bolts (узлов обработки данных). Spouts отвечают за получение данных из источников данных, а Bolts — за обработку данных, включая фильтрацию, агрегацию, подключение к внешним системам и другие операции.
Spout — отправная точка потока данных,Он постоянно извлекает данные из внешних источников данных.(нравитьсяKafka、MQTTждать)Извлечь данные и отправить вTopologyсередина。каждыйSpoutнеобходимо реализоватьIRichSpout
интерфейс,Определить логику сбора данных и механизм восстановления после сбоев.
Bolt — основной процессор Storm.,Отвечает за преобразование и обработку данных. Он может выполнять различные операции, такие как фильтрация, агрегирование, функциональные операции и запись в базу данных. Болты можно соединять, образуя сложные технологические цепочки.,каждыйBoltМожет потреблять один или несколькоBoltилиSpoutисходящий поток данных。Boltнеобходимо реализоватьIBasicBolt
илиIRichBolt
интерфейс。
Storm использует механизм подтверждения, чтобы гарантировать правильную обработку каждого кортежа (единицы данных). Когда кортеж будет полностью обработан, акер получит подтверждение, в противном случае кортеж будет отправлен повторно, тем самым обеспечивая целостность обработки данных.
Потеря данных обычно вызвана неправильной конфигурацией топологии или ошибками логики обработки. Обязательно включите механизм подтверждения сообщений и правильно обрабатывайте исключения, чтобы не прерывать процесс обработки данных.
Проблемы с производительностью часто вызваны необоснованным распределением ресурсов, неравномерностью данных или чрезмерной сложностью логики обработки. Разумно распределяйте количество работников, исполнителей и задач, оптимизируйте структуру потока данных и сокращайте ненужную передачу и обработку данных.
Неправильная настройка или пропуск параметров отказоустойчивости может привести к несогласованности данных или сбоям выполнения задач. Глубоко понимать механизм отказоустойчивости Storm и правильно настраивать стратегии подтверждения сообщений, чтобы обеспечить стабильную работу системы.
Ниже приведен простой пример топологии Storm, реализующий функцию подсчета слов.
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// Spout: отправить предложение
builder.setSpout("word-spout", new SentenceSpout(), 1);
// Bolt: причастие
builder.setBolt("split-bolt", new SplitSentenceBolt(), 2)
.shuffleGrouping("word-spout");
// Bolt: считать
builder.setBolt("count-bolt", new WordCountBolt(), 4)
.fieldsGrouping("split-bolt", new Fields("word"));
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", config, builder.createTopology());
}
}
}
В этом примере,SentenceSpout
отправить предложение,SplitSentenceBolt
Ответственныйпричастие,WordCountBolt
статистикакаждыйпоявляется словоизчастота。через этот пример,Вы можете интуитивно почувствовать процесс обработки потока данных Storm.
В предыдущей части мы представили основные понятия, принципы работы Apache Storm и простой пример подсчета слов. Далее мы обсудим, как бороться с распространенными проблемами и точками ошибок, а также как оптимизировать топологию Storm для повышения производительности.
Задержка данных может быть вызвана тем, что скорость обработки не соответствует поступающим данным. Решения включают в себя:
Неравномерность данных означает, что объем данных, обрабатываемых некоторыми узлами, намного больше, чем у других узлов, что приводит к неравномерной нагрузке. Решения включают в себя:
Задержка с длинным хвостом означает, что обработка некоторых кортежей занимает слишком много времени. Это может быть связано с высокой сложностью обработки конкретных данных или сбоем конкретного узла. Решение:
Правильная настройка параллелизма Топологии (количества воркеров, исполнителей и задач) — залог оптимизации производительности. Может динамически настраиваться в зависимости от ресурсов кластера и нагрузки задач.
Config config = new Config();
config.setNumWorkers(10); // Установите количество рабочих
config.setNumExecutors("split-bolt", 5); // Установить количество исполнителей для конкретного Болта
Прежде чем отправлять топологию в рабочую среду, вы можете протестировать ее в локальном режиме, чтобы проверить правильность конфигурации и логики.
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test-topology", config, topology);
Использование менеджеров ресурсов, таких как YARN или Kubernetes, позволяет лучше планировать ресурсы кластера Storm и управлять ими.
Включите мониторинг и ведение журнала, чтобы оперативно выявлять и решать проблемы.
config.setDebug(true); // Включить режим отладки
config.setLogConfig(new HashMap<String, Object>()); // Настройка параметров журнала
Углубленное исследование Apache Основы Storm, решение распространенных проблем, стратегия оптимизациипосле,Давайте расширим его дальше,Узнайте, как реализовать расширенные функции и лучшие практики в реальных проектах.,Повысить надежность и масштабируемость приложений.
Trident — это высокоуровневая абстракция Storm. Он обеспечивает возможности управления состоянием и обработки транзакций и очень подходит для сценариев, требующих точной семантики однократной обработки, такой как подсчет, агрегирование и другие операции обновления состояния.
javaTridentTopology topology = new TridentTopology();
Stream inputStream = topology.newStream("spout", new MemorySpout());
// Подсчитайте общее количество слов
Stream wordCounts = inputStream.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
topology.commit(topology.build());
Apache Storm стал предпочтительным инструментом для многих проектов анализа данных в реальном времени благодаря своим мощным возможностям обработки в реальном времени. Однако, чтобы по-настоящему реализовать его потенциал, вам необходимо не только освоить основные концепции и операции, но также необходимо иметь глубокое понимание его расширенных функций, а также постоянной оптимизации и настройки для работы с различными сложными сценариями. Я надеюсь, что благодаря вышеупомянутым расширенным функциям, практическим навыкам и обмену практическим опытом вы сможете более комфортно создавать систему обработки данных в реальном времени и добиться эффективной и стабильной работы системы. Благодаря постоянному развитию технологий, постоянному обучению и практике ваше исследование в области вычислений в реальном времени станет более ярким.