Ссылки:pyspark-rdd
RDD (устойчивый распределенный набор данных) да PySpark Основной строительный блок программирования daspark, самый простой объект данных;
Все наборы данных в его приложении daspark, включая первоначально загруженный набор данных, набор промежуточных расчетных данных и окончательный набор данных результата, являются даRDD.
По сути, объект RDDда представляет собой коллекцию, распределенную на каждом узле и представляющую данные в программе Spark. Если взять в качестве примера Pyspark, то RDD состоит из объектов Python, распределенных на каждом узле, которые представляют собой коллекцию объектов, аналогичную списку самого Python. Разница в том, что коллекция Python хранится только в одном процессе для обработки, а RDD распределяется по каждому узлу, а это означает, что коллекция Python рассчитывается для нескольких процессов на нескольких физических серверах.
Еще одна вещь, о которой следует упомянуть здесь,Хотя можно сохранить СДР на жёсткий диск.,Но RDD в основном хранится в памяти.,Ожидается, что по крайней мере да будет хранить существование в памяти,Потому что Spark появился для поддержки машинного обучения.
Создав RDD, вы не сможете его изменить.
RDD,Resilient Distributed Dataset, Эластичный, распределенный набор данных,Вы можете разобрать это и проанализировать по его имени:
эластичность
: иметьэластичность,даотказоустойчивый,То есть, если потерян узел, выполняющий задачу,Набор данныхеще можно построить。этотдапотому чтоКаждый RDD имеет свою генеалогию (DAG).,RDD можно создавать с нуля.распределенный
:RDDдараспределенныйиз,Данные RDD разделены как минимум на один раздел.,существующий по рабочим узлам кластера распределенный сохраняется в существующей памяти как совокупность объектов;Набор данных
: RDDда Зависит от Записыватькомпозицияиз Набор данных。так называемыйЗаписывать,Подобно «строке» данных в таблице.,Обычно состоит из нескольких полей. Загрузка,Единственный набор различимых данных в наборе данных,Каждый раздел RDD содержит отдельную часть.,Операцию можно выполнить самостоятельно.Преимущества РДД заключаются в следующем:
PySpark RDD Не очень подходит для приложений, обновляющих хранилища состояний.,Например Web Система хранения приложения. Для этих приложений более эффективно использовать систему (например, базу данных), которая выполняет традиционную регистрацию обновлений и проверку данных.
Цель RDD — предоставить эффективную модель программирования для пакетного анализа и отказаться от асинхронных приложений.
RDD в основном создаются двумя разными способами:
При использовании pyspark в самом начале обычно вызывается следующая программа ввода:
from pyspark.sql import SparkSession
# создатель искрового объекта
spark = SparkSession \
.builder \
.appName("test") \
.getOrCreate()
sc = spark.sparkContext
этотфункцияв драйвереизЗагрузите существующую коллекцию в параллельный RDD. середина. Это дасоздавать RDD Базовый метод, используемый, когда данные, загруженные из файла или базы данных, уже находятся в памяти. и это требует, чтобы при создании RDD Раньше все данные существовали в драйвере.
#Create RDD from parallelize
data = [1,2,3,4,5,6,7,8,9,10,11,12]
Rdd = spark.sparkContext.parallelize(data)
Spark Считать текстовый файл в RDD — Ссылки
sparkContext.textFile()
Для чтения текстовых файлов из HDFS, S3 и любой файловой системы, поддерживаемой Hadoop, этот метод принимает путь в качестве аргумента и, при необходимости, несколько разделов в качестве второго аргумента;
sparkContext.wholeTextFiles()
Считать текстовый файл в RDD(String,String) тип PairedRDD,Ключ к пути к файлу,Значение содержимого файла. Этот метод также принимает путь в качестве параметра,и, возможно, несколько разделов в качестве второго параметра.
Когда мы знаем имена нескольких файлов для чтения, если мы хотим прочитать все файлы из папки для создания RDD, просто введите все имена файлов и папку с разделителем-запятой, и на этом этапе поддерживаются оба вышеуказанных метода. Сопоставление с образцом и подстановочные знаки также принимаются.
rdd = spark.sparkContext.emptyRDD
rdd2 = spark.sparkContext.parallelize( [ ],10) #This creates 10 partitions
Когда вы запускаете RDD, он автоматически разбивает данные на разделы в зависимости от доступности ресурсов.
getNumPartitions()
- Вот этот RDD Функция, которая возвращает разделы, на которые разделен наш набор данных.
Мы также можем установить несколько разделов вручную, нам просто нужно передать несколько разделов в качестве второго параметра этим функциям:
Например
sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 10)
Иногда нам может потребоваться **перераспределение**. PySpark предоставляет два способа перераспределения.
Первый:использоватьrepartition(numPartitions)
откудаиметь Узел перемешивает данныеизметод,Также известно как полная перетасовка,
Метод repartition() очень затратен.,Потому что он перемешивает данные со всех узлов кластера.
второй:использоватьcoalesce(n)
метод**Перетасуйте данные из самых маленьких узлов,Используется только для уменьшения количества разделов**.
Это оптимизированная или улучшенная версия функции darepartition(), которая использует слияние для уменьшения перемещения данных между разделами.
Например,Если сейчассуществоватьиметь 4 разделы, то Coalesce(2) начинается только с 2 Узлы перемещают данные.
#Перед выполнением:
Partition 1 : 0 1 2
Partition 2 : 3 4 5
Partition 3 : 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15
Partition 6 : 16 17 18 19
#После выполнения:
Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15 16 17 18 19
третий:использоватьpartitionBy(numPartitions, partiontionFunc=portable_hash)
функция,
Подробное описание можно найти в моем блоге:
Примечания к исследованию Pyspark (5) Операции RDD (1) Операции преобразования _RDD
Примечания к исследованию Pyspark (5) Операции RDD (2)_Операции действий RDD
Преобразования )
:действоватьRDDи вернутьсяодин Новый РДД функция;
Действия )
:Управлять СДР, расчет триггера, и вернуться значение или выполнить вывод функция.
Самая большая разница между ними заключается в,конвертироватьдействоватьдалени , будет RDD Конвертировать/обновиться до другого,иметь в видуПока мы не позвоним да не будет выполнять расчеты перед действием.
Более подробную информацию и примеры можно найти в следующем сообщении блога.
Помимо базового типа BaseRDD, который содержит общие свойства и функции, RDD также имеет следующие общие типы:
PairRDD
: RDD, состоящий из пар ключ-значение. Например, содержимое, прочитанное с помощью упомянутого ранее метода WholeTextFiles(), сохраняется в форме пар ключ-значение.
DoubleRDD
: RDD, состоящий из чисел двойной точности с плавающей запятой.
DataFrame
:доизверсия называетсяSchemaRDD,пресс-группаиметьфиксированное имяитипорганизовано в столбцыизраспределенный Набор данных. DataFrame эквивалентен реляционной таблице в sparkSQL. Поэтому мы часто создаем этот DataFrame при использовании sparkSQL.
HadoopRDD
:Обеспечить хранилище для чтениясуществоватьHDFSначальствоизданныеизRDD。
Shuffle да PySpark используется длясуществоватьМеханизм перераспределения данных между разными исполнителями или даже между машинами.。
К операциям, которые могут вызвать перемешивание, относятся:
repartition
иcoalesce
ждатьПеределдействовать,
groupByKey
иreduceByKey
ждатьОперация агрегирования(кроме количества),
а такжеcogroup
иjoin
ждатьПрисоединиться к операции
PySpark Shuffle это дорогая операция, потому что она включает в себя следующее
· Дисковый ввод/вывод
· Включает сериализацию и десериализацию данных.
·Сетевой ввод/вывод
Перемешать размер раздела и производительность
В зависимости от размера набора данных увеличение количества ядер и перетасовка памяти могут быть полезны или вредны для нашей задачи.
①при обращенииМеньший объем данныхчас,Обычно тасовка должна сокращаться, перегородка,
В противном случае вы получите множество файлов разделов с меньшим количеством записей в каждом разделе, что приведет к фрагментации файлов.
② С другой стороны, если данных слишком много, а количество разделов мало, это приведет к уменьшению количества задач, выполнение которых займет больше времени, а иногда могут возникнуть ошибки нехватки памяти.
Получите правильный размер shuffle Подсчет разделов сложен и требует нескольких прогонов с разными значениями для достижения оптимизированного числа. Когда существуют PySpark Это один из ключевых атрибутов, на который следует обращать внимание при возникновении проблем с производительностью задачи.
⓪ Примечания к исследованию Pyspark (1) — предисловие и содержание
①.Примечания к исследованию Pyspark (2) — Введение в развертывание искры и команду искровой отправки
②.Примечания к исследованию Pyspark (3) — SparkContext и SparkSession
③.Примечания к исследованию Pyspark (4) Обзор RDD эластичного распределенного набора данных (Часть 1)
④Примечания к исследованию Pyspark (4) эластичный распределенный набор данных RDD Обзор (часть 2)
⑤Примечания к исследованию Pyspark (5) Операции RDD (1) Операции преобразования _RDD
⑥Примечания к исследованию Pyspark (5) Операции RDD (2)_Операции действий RDD
⑦Примечания к исследованию Pyspark (5) Операции RDD (3) Операции преобразования RDD пары ключ-значение