Небольшой случай Python (10) Использование PySpark для записи данных в цикле
Небольшой случай Python (10) Использование PySpark для записи данных в цикле

Небольшой случай Python (10) Использование PySpark для записи данных в цикле

При анализе данных часто необходимо оглянуться на исторические данные. Однако иногда при построении исторических данных необходимо изменять параметры и повторно запускать данные. Система планирования хранилища данных компании часто поддерживает только один параметр: дату, и создавать сценарии планирования для временных данных немного расточительно. В настоящее время вы можете объединить форматирование строк Python и запись Hive PySpark, чтобы завершить циклическую запись временных данных.

⚠️Примечание. Следующие действия необходимо выполнить на Jupyter на корпоративном сервере. Локальный Jupyter не может подключиться к корпоративному кластеру Hive.

Случай 1: несколько параметров записываются во временную таблицу в цикле.

Предыстория дела: запись ежедневных данных горячего поиска. Типы горячего поиска делятся на текущий день, последний 1 день, последние 2 дня и последние 3 дня. Для удобства здесь упрощена прочность петли.

Язык кода:javascript
копировать
from pyspark.sql import *
# конфигурация искры
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

# Импортируйте другие связанные библиотеки.
import pandas as pd
from datetime import datetime
Язык кода:javascript
копировать
# sql создает временную таблицу
sql_create = '''
CREATE TABLE temp.loop_write_example
    (
        cnt string comment "cnt за последние n дней"
    )
PARTITIONED BY (`point_date` string, `dtype` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''

spark.sql(sql_create)
Язык кода:javascript
копировать
DataFrame[]

构造日期'{dt}'и горячие типы поиска{num}два параметра

Язык кода:javascript
копировать
# sql записывает во временную таблицу
sql_insert = '''
insert overwrite table temp.loop_write_example partition (point_date = '{dt}',dtype={num})


select
    sum(if(dt between date_add('{dt}',-{num}) and '{dt}',cnt,null)) as cnt
from
    temp.loop_write_example_fake_data
where
    dt between date_add('{dt}',-4) and '{dt}'
'''
Язык кода:javascript
копировать
dates = pd.date_range('2021-01-01','2021-01-10').strftime("%Y-%m-%d").to_list() # диапазон дат
Язык кода:javascript
копировать
# Циклическая записьвременная таблица
for point_date in dates:
    if point_date>='2021-01-01' and point_date<'2021-01-03':
        for dtype in range(0,4):
            start_time = datetime.now()
            spark.sql(sql_insert.format(dt=point_date, num=dtype))
            end_time=datetime.now()
            print (point_date, dtype, "succeed", 'Требует времени'+str((end_time-start_time).секунды)+'секунды')
Язык кода:javascript
копировать
2021-01-01 0 succeed Это заняло 8 секунд
2021-01-01 1 succeed Это заняло 7 секунд
2021-01-01 2 succeed Это заняло 8 секунд
2021-01-01 3 succeed Это заняло 8 секунд
2021-01-02 0 succeed Это заняло 8 секунд
2021-01-02 1 succeed Это заняло 8 секунд
2021-01-02 2 succeed Это заняло 8 секунд
2021-01-02 3 succeed Это заняло 8 секунд

Случай 2: Одновременная пакетная запись в HDFS

Предыстория: Запишите более 200 миллионов вопросов в HDFS в пакетном режиме в соответствии с правилами исследований и разработок для запроса через интерфейс. Для каждой HDFS требуется максимум 10 миллионов Вт.

Язык кода:javascript
копировать
from pyspark.sql import *
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

import math
import pandas as pd
from datetime import datetime
import time
import os
Язык кода:javascript
копировать
# Для удобства данные, генерируемые правилами, хранятся во временной таблице temp.hh_qids. Детали правил знать не нужно.
# Посмотреть объем данных
df_cnt = spark.sql('select count(1) as cnt from temp.hh_qids').toPandas()
N = df_cnt['cnt'].loc[0] # Получить объем данных
print(N)
Язык кода:javascript
копировать
273230858
Язык кода:javascript
копировать
# Создайте таблицу и сгенерируйте суффикс таблицы через параметр i.
creat_sql = '''
CREATE TABLE IF NOT EXISTS temp.hh_mult_write_{i}
    (
        questionid string comment «Идентификатор вопроса»
    )
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
Язык кода:javascript
копировать
# Запись в таблицу, запись во временную таблицу, созданную выше
insert_sql = '''
insert overwrite table temp.hh_mult_write_{i}

select
    questionid
from
    temp.hh_qids
where
    ceil(rn/10000000)={i}
order by
    questionid
limit 100000000
'''
  • Циклическая запись
Язык кода:javascript
копировать
%%time

# Создайте несколько временных таблиц через цикл и напишите
for i in range(1,math.ceil(N/10000000)+1):
    start_time = datetime.now()
    
    spark.sql(creat_sql.format(i=i)) # Создать таблицу
    spark.sql(insert_sql.format(i=i)) # написать таблицу
    
    end_time=datetime.now()
    
    print(f"Успешно записано hh_mult_write_{i},"+'Требует времени'+str((end_time-start_time).секунды)+'секунды')
Язык кода:javascript
копировать
Успешно записано в hh_mult_write_1, что заняло 38 секунд.
Успешно записано в hh_mult_write_2, что заняло 59 секунд.
Успешно записано в hh_mult_write_3, что заняло 36 секунд.
Успешно записано в hh_mult_write_4, что заняло 34 секунды.
Успешно записано в hh_mult_write_5, что заняло 29 секунд.
Успешно записано в hh_mult_write_6, что заняло 26 секунд.
Успешно записано в hh_mult_write_7, что заняло 44 секунды.
Успешно записано в hh_mult_write_8, что заняло 43 секунды.
Успешно записано в hh_mult_write_9, что заняло 32 секунды.
Успешно записано hh_mult_write_10, что заняло 49 секунд.
Успешно записано в hh_mult_write_11, что заняло 33 секунды.
Успешно записано в hh_mult_write_12, что заняло 34 секунды.
Успешно записано в hh_mult_write_13, что заняло 38 секунд.
Успешно записано в hh_mult_write_14, что заняло 24 секунды.
Успешно записан hh_mult_write_15, что заняло 40 секунд.
Успешно записано в hh_mult_write_16, что заняло 34 секунды.
Успешно записано в hh_mult_write_17, что заняло 39 секунд.
Успешно записано в hh_mult_write_18, что заняло 45 секунд.
Записано успешно hh_mult_write_19, заняло 50 секунд
Успешно записан hh_mult_write_20, что заняло 35 секунд.
Успешно записано в hh_mult_write_21, что заняло 46 секунд.
Успешно записано в hh_mult_write_22, что заняло 38 секунд.
Успешно записано в hh_mult_write_23, что заняло 29 секунд.
Успешно записано в hh_mult_write_24, что заняло 31 секунду.
Успешно записан hh_mult_write_25, что заняло 28 секунд.
Успешно записано в hh_mult_write_26, что заняло 36 секунд.
Успешно записано hh_mult_write_27, что заняло 32 секунды.
Успешно записано в hh_mult_write_28, что заняло 17 секунд.
CPU times: user 124 ms, sys: 31.8 ms, total: 156 ms
Wall time: 17min 15s

На этот раз, посредством крупномасштабной практической демонстрации данных, мы можем обнаружить, что эффективность неплохая, а запись 28 файлов занимает всего 17 минут. 15 с. Но в повседневной работе могут быть более сложные записи или записи большего размера. Есть ли способ повысить эффективность? Все знают, что цикл Python однопоточный.,Следующий цикл не будет запущен до окончания одного цикла. Системы планирования обычно также могут поддерживать параллелизм.,Так может ли Python также реализовать многопоточность посредством параллелизма? конечно,Существует много методов,Но после экспериментов я обнаружил, чтоjoblibПростота в использовании。

Вот простой маленькийcaseДемоjoblibЭффект

Язык кода:javascript
копировать
# Проверьте количество процессоров сервера кластера
print(os.cpu_count())

48

Язык кода:javascript
копировать
%%time

# Проверьте время выполнения простого цикла: 15 с.
for i in range(5):
 for j in range(3):
     time.sleep(1)
     print(i*j)

0 0 0 0 1 2 0 2 4 0 3 6 0 4 8 CPU times: user 12.2 ms, sys: 6.18 ms, total: 18.3 ms Wall time: 15 s

Язык кода:javascript
копировать
%%time

# Проверьте время выполнения при многопоточности: 1,35 с (молодец, более чем в 10 раз быстрее!)
from joblib import Parallel, delayed

def product2(x,y):
 time.sleep(1)
 return x*y

# n_jobs=-1 означает использование всего процессора
Parallel(n_jobs=-1)(delayed(product2)(i,j) for i in range(5) for j in range(3))

CPU times: user 111 ms, sys: 233 ms, total: 344 ms Wall time: 1.35 s

[0, 0, 0, 0, 1, 2, 0, 2, 4, 0, 3, 6, 0, 4, 8]

Как видите, эффект ускорения по-прежнему велик, но будет ли реальное приложение таким же хорошим?

  • Одновременная запись
Язык кода:javascript
копировать
# Конструктор — оборачивает основной процесс одного цикла в функцию для параллельного вызова.
def creat_insert(i):
    start_time = datetime.now()
    
    spark.sql(creat_sql.format(i=i)) # Создать таблицу
    spark.sql(insert_sql.format(i=i)) # написать таблицу
    
    end_time=datetime.now()
    
    print_str = f"Успешно записано в hh_mult_test_{i},"+'Требуется время'+str((end_time-start_time).секунды)+'секунды'
    return print_str
Язык кода:javascript
копировать
%%time

# Одновременная запись
from joblib import Parallel, delayed

# Все используют кластерные серверы. При обработке больших задач не рекомендуется задействовать весь процессор. Здесь достаточно и половины.
Parallel(n_jobs=24, prefer="threads")(delayed(creat_insert)(i) for i in range(1,math.ceil(N/10000000)+1))
Язык кода:javascript
копировать
CPU times: user 87.6 ms, sys: 18.8 ms, total: 106 ms
Wall time: 1min 49s

['Успешно записано в hh_mult_test_1, это заняло 44 секунды',
 'Успешно записано в hh_mult_test_2, это заняло 41 секунду',
 'Успешно записано в hh_mult_test_3, заняло 83 секунды',
 'Успешно записано в hh_mult_test_4, это заняло 49 секунд',
 'Успешно записано в hh_mult_test_5, заняло 89 секунд',
 'Успешно записано в hh_mult_test_6, это заняло 71 секунду',
 'Успешно записано в hh_mult_test_7, заняло 89 секунд',
 'Успешно записано в hh_mult_test_8, это заняло 72 секунды',
 'Успешно записано в hh_mult_test_9, это заняло 83 секунды',
 'Успешно записано в hh_mult_test_10, потребовалось 77 секунд',
 'Успешно записано в hh_mult_test_11, это заняло 80 секунд',
 'Успешно записано в hh_mult_test_12, это заняло 65 секунд',
 'Успешно записано в hh_mult_test_13, заняло 53 секунды',
 'Успешно написано hh_mult_test_14, заняло 109 секунд',
 'Успешно записано в hh_mult_test_15, заняло 81 секунду',
 'Успешно записано в hh_mult_test_16, потребовалось 73 секунды',
 'Успешно записано в hh_mult_test_17, это заняло 41 секунду',
 'Успешно записано в hh_mult_test_18, потребовалось 78 секунд',
 'Успешно записано в hh_mult_test_19, заняло 84 секунды',
 'Успешно записано в hh_mult_test_20, это заняло 93 секунды',
 'Успешно записано в hh_mult_test_21, это заняло 68 секунд',
 'Успешно записано в hh_mult_test_22, потребовалось 78 секунд',
 'Успешно записано в hh_mult_test_23, это заняло 48 секунд',
 'Успешно записано в hh_mult_test_24, потребовалось 88 секунд',
 'Успешно записано в hh_mult_test_25, потребовалось 54 секунды',
 'Успешно записано в hh_mult_test_26, потребовалось 59 секунд',
 'Успешно записано в hh_mult_test_27, это заняло 62 секунды',
 'Успешно записано в hh_mult_test_28, это заняло 37 секунд']

Видно, что время записи каждого файла аналогично времени цикла, оба из которых составляют около 60 секунд. Но весь процесс занял всего 1 минуту и ​​49 секунд, что было более чем в 10 раз быстрее.

  • Удалить тестовые данные
Язык кода:javascript
копировать
%%time

# Объем тестовых данных большой и занимать ресурсы компании без причины неправильно, поэтому их необходимо удалить.
# Но я не могу удалить их один за другим вручную, я могу просто сделать простой цикл.
for i in range(1,29):
    drop_sql='''
    DROP TABLE IF EXISTS temp.hh_mult_test_1{i};
    '''
    
    spark.sql(drop_sql.format(i=i)) # Удалить таблицу
Язык кода:javascript
копировать
CPU times: user 3.94 ms, sys: 1.96 ms, total: 5.91 ms
Wall time: 148 ms

Подвести итог

На этом серия небольших кейсов Python подошла к концу. Эти кейсы в основном взяты из моей повседневной деятельности. Python по-прежнему имеет свое место в обработке сложных требований и повышении эффективности работы. Интересно, есть ли у вас какие-нибудь практические небольшие случаи, когда Python решает повседневные задачи?

Давайте поддержим друг друга~

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose