В этой статье в основном описывается, как кэшировать или совместно использовать переменные при выполнении задач pyspark для экономии ресурсов, вычислений, времени и т. д.
Ссылки:https://sparkbyexamples.com/pyspark-rdd#rdd-persistence
В предыдущем блоге мы упоминали, что RDD 的Операции преобразования ленивы,Вам придется подождать, пока не будет предпринято следующее действие.,Только тогда расчет действительно будет выполнен;
Итак, если в нашей блок-схеме есть несколько ветвей, например, промежуточный результат определенной операции преобразования X используется несколькими последующими параллельными блок-схемами (a, b, c), то произойдет такая ситуация:
После выполнения последующего (а, б, в) Сроки различных процессов,Операция встречного действиячас,пересчитает весь график с нуля,То есть операция преобразования X,Будет повторно запланировано к выполнению:(X->a), (X->b), (X->c); Это приведет к пустой трате времени и вычислительных ресурсов, поэтому сохранение RDD очень полезно.
PySpark Используя cache()
иpersist()
Обеспечивает механизм оптимизации для хранения RDD промежуточный расчет,чтобы их можно было повторно использовать в последующих операциях。когда Выносливостьили кэш один RDD каждый рабочий узел хранит данные своего раздела в памяти или на диске, и RDD Повторно используйте их в других операциях。Spark на узлеПостоянные данные отказоустойчивы,Это означает, что если какой-либо раздел потерян,Эта Воля автоматически пересчитывается с использованием исходного преобразования, которое ее создало.
cache()
По умолчанию будет RDD Расчет сохранен на уровне хранилища. MEMORY_ONLY
, что означает, что он сохраняет данные как несериализованный объект в JVM в куче
(Для кеша Spark DataFrame или набора данных сохраните его на уровне хранения `MEMORY_AND_DISK')
cachedRdd = rdd.cache()
persist()
Есть две сигнатуры функций
Первая подпись не принимает никаких параметров и по умолчанию сохраняет ее на уровне хранения MEMORY_AND_DISK,
пример:
dfPersist = df.persist()
вторая подписьStorageLevelкак параметр Воля Чтохранилищек разнымхранилищеуровень;
пример:
dfPersist = df.persist(StorageLevel.MEMORY_ONLY)
Необязательные параметры для этого параметра: MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2.
unpersist()
PySpark будет автоматически контролировать каждыйpersist()
иcache()
вызов,и проверьте использование на каждом узле,И используйте «Наименее недавно использованный» (LRU), когда он не используется.
Также используйте unpersist() 方法手动удалить.unpersist()
Воля RDD Отметить как непостоянный,И удалите все его блоки из памяти и диска:
rddPersist2 = rddPersist.unpersist()
о cache() и persist()некоторые тонкие различия:Связь
Ссылки:
①https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
② https://sparkbyexamples.com/spark/spark-persistence-storage-levels/
Код следующий (пример):
import org.apache.spark.storage.StorageLevel
rdd2 = rdd.persist(StorageLevel.MEMORY_ONLY_SER)
df2 = df.persist(StorageLevel.MEMORY_ONLY_SER)
MEMORY_ONLY
Это RDD cache() Поведение метода по умолчанию,и Воля RDD или DataFrame хранится как десериализованный объект для JVM в памяти. Не сохраняет определенные разделы, когда недостаточно свободной памяти. DataFrame, эти Воля нужно пересчитывать. Это требует больше места в памяти, но выполняется быстрее, поскольку чтение из памяти требует меньше ресурсов. CPU цикл.
MEMORY_AND_DISK
На этом уровне хранения RDD Волякак счетчиксериализацияхранилище объектов在 JVM в памяти. Когда требуемое пространство хранилища больше, чем доступная память в час, он будет записывать некоторые избыточные разделы хранилища на диск и считывать данные с диска, когда это необходимо. из-за участия ввода-вывода и, следовательно, медленнее.
DISK_ONLY
На этом уровне хранения RDD хранится только на диске, и поскольку он включает в себя I/O,CPU Время расчета больше.
MEMORY_ONLY_2
иMEMORY_ONLY Уровень хранения тот же, но воля каждого раздела копировать на два узла кластера.
MEMORY_AND_DISK_2
imMEMORY_AND_DISK Уровень хранения тот же, но воля каждого раздела копировать на два узла кластера.
DISK_ONLY_2
с DISK_ONLY Уровень хранения тот же, но воля каждый раздел копировать на два узла кластера.
Ниже приводится табличное представление уровней хранения по пространству и ЦП. Производительность влияет на выбор наиболее подходящего.
------------------------------------------------------------------------------------------------------------------------------------
Уровень памяти | Занятое пространство | В памяти | Сериализация |
------------------------------------------------------------------------------------------------------------------------------------
MEMORY_ONLY High Low Y N N Y
MEMORY_AND_DISK High Medium some some some N
DISK_ONLY Low High N Y Y N
------------------------------------------------------------------------------------------------------------------------------------
Пожалуйста, обратитесь к официальной документации за инструкциями:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose
когда PySpark использоватьmap()
илиreduce()
Преобразование выполнения операциичас,этоВыполнять преобразования на удаленных узлах, используя переменные, прикрепленные к задаче.,и эти переменные не отправляются обратно драйверу PySpark.,поэтомуНевозможно повторно использовать общие переменные между задачами.。PySpark Общие переменные решают эту проблему, используя следующие два метода.
· Широковещательные переменные (общие переменные только для чтения)
· Аккумуляторная переменная (обновляемая общая переменная)
Широковещательные переменныедаОбщие переменные только для чтения,это们被Кэшируется и доступен на всех узлах кластера.,Для облегчения доступа к задачам или их использования. PySpark не отправляет эти данные вместе с каждой задачей.,而даиспользовать高效的广播算法Воля Широковещательные переменныераспределяется по машинам,сократить расходы на связь.
PySpark RDD Broadcast Один из лучших вариантов использования — поиск данных.
Усеспаркконтекст 类的方法broadcast(v)
Созданный。
Код следующий (пример):
broadcastVar = sc.broadcast([0, 1, 2, 3])
broadcastVar.value
Обратите внимание, что широковещательные переменные не будет вызван sc.broadcast(variable)
час Просто отправьте его исполнителю,而да在首次используй эточасотправлено исполнителю
Ссылки:https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/
Аккумуляторы — это еще один тип общей переменной, которая просто «добавляется» посредством операции ассоциации и обмена для выполнения счетчика (аналогично счетчику Map-reduce) или операции поиска.
Я не буду здесь вдаваться в подробности, но вы можете обратиться к:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
⓪ Примечания к исследованию Pyspark (1) — предисловие и содержание
①.Примечания к исследованию Pyspark (2) — команда искровой отправки
②.Примечания к исследованию Pyspark (3) — SparkContext и SparkSession
③.Примечания к исследованию Pyspark (4) эластичный распределенный набор данных RDD Резюме (Часть 1)
④Примечания к исследованию Pyspark (4) Обзор RDD эластичного распределенного набора данных (Часть 2)
⑤Примечания к исследованию Pyspark (5) Операции RDD (1) Операции преобразования _RDD
⑥Примечания к исследованию Pyspark (5) Операции RDD (2) Операции действий_RDD
⑦Примечания к исследованию Pyspark (5) Операции RDD (3) Операции преобразования RDD пары ключ-значение