При анализе данных часто необходимо оглянуться на исторические данные. Однако иногда при построении исторических данных необходимо изменять параметры и повторно запускать данные. Система планирования хранилища данных компании часто поддерживает только один параметр: дату, и создавать сценарии планирования для временных данных немного расточительно. В настоящее время вы можете объединить форматирование строк Python и запись Hive PySpark, чтобы завершить циклическую запись временных данных.
⚠️Примечание. Следующие действия необходимо выполнить на Jupyter на корпоративном сервере. Локальный Jupyter не может подключиться к корпоративному кластеру Hive.
Предыстория дела: запись ежедневных данных горячего поиска. Типы горячего поиска делятся на текущий день, последний 1 день, последние 2 дня и последние 3 дня. Для удобства здесь упрощена прочность петли.
from pyspark.sql import *
# конфигурация искры
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.executor.instances", "20") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.enableHiveSupport() \
.getOrCreate()
# Импортируйте другие связанные библиотеки.
import pandas as pd
from datetime import datetime
# sql создает временную таблицу
sql_create = '''
CREATE TABLE temp.loop_write_example
(
cnt string comment "cnt за последние n дней"
)
PARTITIONED BY (`point_date` string, `dtype` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
spark.sql(sql_create)
DataFrame[]
构造日期'{dt}'
и горячие типы поиска{num}
два параметра
# sql записывает во временную таблицу
sql_insert = '''
insert overwrite table temp.loop_write_example partition (point_date = '{dt}',dtype={num})
select
sum(if(dt between date_add('{dt}',-{num}) and '{dt}',cnt,null)) as cnt
from
temp.loop_write_example_fake_data
where
dt between date_add('{dt}',-4) and '{dt}'
'''
dates = pd.date_range('2021-01-01','2021-01-10').strftime("%Y-%m-%d").to_list() # диапазон дат
# Циклическая записьвременная таблица
for point_date in dates:
if point_date>='2021-01-01' and point_date<'2021-01-03':
for dtype in range(0,4):
start_time = datetime.now()
spark.sql(sql_insert.format(dt=point_date, num=dtype))
end_time=datetime.now()
print (point_date, dtype, "succeed", 'Требует времени'+str((end_time-start_time).секунды)+'секунды')
2021-01-01 0 succeed Это заняло 8 секунд
2021-01-01 1 succeed Это заняло 7 секунд
2021-01-01 2 succeed Это заняло 8 секунд
2021-01-01 3 succeed Это заняло 8 секунд
2021-01-02 0 succeed Это заняло 8 секунд
2021-01-02 1 succeed Это заняло 8 секунд
2021-01-02 2 succeed Это заняло 8 секунд
2021-01-02 3 succeed Это заняло 8 секунд
Предыстория: Запишите более 200 миллионов вопросов в HDFS в пакетном режиме в соответствии с правилами исследований и разработок для запроса через интерфейс. Для каждой HDFS требуется максимум 10 миллионов Вт.
from pyspark.sql import *
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.executor.instances", "20") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.enableHiveSupport() \
.getOrCreate()
import math
import pandas as pd
from datetime import datetime
import time
import os
# Для удобства данные, генерируемые правилами, хранятся во временной таблице temp.hh_qids. Детали правил знать не нужно.
# Посмотреть объем данных
df_cnt = spark.sql('select count(1) as cnt from temp.hh_qids').toPandas()
N = df_cnt['cnt'].loc[0] # Получить объем данных
print(N)
273230858
# Создайте таблицу и сгенерируйте суффикс таблицы через параметр i.
creat_sql = '''
CREATE TABLE IF NOT EXISTS temp.hh_mult_write_{i}
(
questionid string comment «Идентификатор вопроса»
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
# Запись в таблицу, запись во временную таблицу, созданную выше
insert_sql = '''
insert overwrite table temp.hh_mult_write_{i}
select
questionid
from
temp.hh_qids
where
ceil(rn/10000000)={i}
order by
questionid
limit 100000000
'''
%%time
# Создайте несколько временных таблиц через цикл и напишите
for i in range(1,math.ceil(N/10000000)+1):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i)) # Создать таблицу
spark.sql(insert_sql.format(i=i)) # написать таблицу
end_time=datetime.now()
print(f"Успешно записано hh_mult_write_{i},"+'Требует времени'+str((end_time-start_time).секунды)+'секунды')
Успешно записано в hh_mult_write_1, что заняло 38 секунд.
Успешно записано в hh_mult_write_2, что заняло 59 секунд.
Успешно записано в hh_mult_write_3, что заняло 36 секунд.
Успешно записано в hh_mult_write_4, что заняло 34 секунды.
Успешно записано в hh_mult_write_5, что заняло 29 секунд.
Успешно записано в hh_mult_write_6, что заняло 26 секунд.
Успешно записано в hh_mult_write_7, что заняло 44 секунды.
Успешно записано в hh_mult_write_8, что заняло 43 секунды.
Успешно записано в hh_mult_write_9, что заняло 32 секунды.
Успешно записано hh_mult_write_10, что заняло 49 секунд.
Успешно записано в hh_mult_write_11, что заняло 33 секунды.
Успешно записано в hh_mult_write_12, что заняло 34 секунды.
Успешно записано в hh_mult_write_13, что заняло 38 секунд.
Успешно записано в hh_mult_write_14, что заняло 24 секунды.
Успешно записан hh_mult_write_15, что заняло 40 секунд.
Успешно записано в hh_mult_write_16, что заняло 34 секунды.
Успешно записано в hh_mult_write_17, что заняло 39 секунд.
Успешно записано в hh_mult_write_18, что заняло 45 секунд.
Записано успешно hh_mult_write_19, заняло 50 секунд
Успешно записан hh_mult_write_20, что заняло 35 секунд.
Успешно записано в hh_mult_write_21, что заняло 46 секунд.
Успешно записано в hh_mult_write_22, что заняло 38 секунд.
Успешно записано в hh_mult_write_23, что заняло 29 секунд.
Успешно записано в hh_mult_write_24, что заняло 31 секунду.
Успешно записан hh_mult_write_25, что заняло 28 секунд.
Успешно записано в hh_mult_write_26, что заняло 36 секунд.
Успешно записано hh_mult_write_27, что заняло 32 секунды.
Успешно записано в hh_mult_write_28, что заняло 17 секунд.
CPU times: user 124 ms, sys: 31.8 ms, total: 156 ms
Wall time: 17min 15s
На этот раз, посредством крупномасштабной практической демонстрации данных, мы можем обнаружить, что эффективность неплохая, а запись 28 файлов занимает всего 17 минут. 15 с. Но в повседневной работе могут быть более сложные записи или записи большего размера. Есть ли способ повысить эффективность? Все знают, что цикл Python однопоточный.,Следующий цикл не будет запущен до окончания одного цикла. Системы планирования обычно также могут поддерживать параллелизм.,Так может ли Python также реализовать многопоточность посредством параллелизма? конечно,Существует много методов,Но после экспериментов я обнаружил, что
joblib
Простота в использовании。
Вот простой маленькийcaseДемоjoblib
Эффект
# Проверьте количество процессоров сервера кластера
print(os.cpu_count())
48
%%time
# Проверьте время выполнения простого цикла: 15 с.
for i in range(5):
for j in range(3):
time.sleep(1)
print(i*j)
0 0 0 0 1 2 0 2 4 0 3 6 0 4 8 CPU times: user 12.2 ms, sys: 6.18 ms, total: 18.3 ms Wall time: 15 s
%%time
# Проверьте время выполнения при многопоточности: 1,35 с (молодец, более чем в 10 раз быстрее!)
from joblib import Parallel, delayed
def product2(x,y):
time.sleep(1)
return x*y
# n_jobs=-1 означает использование всего процессора
Parallel(n_jobs=-1)(delayed(product2)(i,j) for i in range(5) for j in range(3))
CPU times: user 111 ms, sys: 233 ms, total: 344 ms Wall time: 1.35 s
[0, 0, 0, 0, 1, 2, 0, 2, 4, 0, 3, 6, 0, 4, 8]
Как видите, эффект ускорения по-прежнему велик, но будет ли реальное приложение таким же хорошим?
# Конструктор — оборачивает основной процесс одного цикла в функцию для параллельного вызова.
def creat_insert(i):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i)) # Создать таблицу
spark.sql(insert_sql.format(i=i)) # написать таблицу
end_time=datetime.now()
print_str = f"Успешно записано в hh_mult_test_{i},"+'Требуется время'+str((end_time-start_time).секунды)+'секунды'
return print_str
%%time
# Одновременная запись
from joblib import Parallel, delayed
# Все используют кластерные серверы. При обработке больших задач не рекомендуется задействовать весь процессор. Здесь достаточно и половины.
Parallel(n_jobs=24, prefer="threads")(delayed(creat_insert)(i) for i in range(1,math.ceil(N/10000000)+1))
CPU times: user 87.6 ms, sys: 18.8 ms, total: 106 ms
Wall time: 1min 49s
['Успешно записано в hh_mult_test_1, это заняло 44 секунды',
'Успешно записано в hh_mult_test_2, это заняло 41 секунду',
'Успешно записано в hh_mult_test_3, заняло 83 секунды',
'Успешно записано в hh_mult_test_4, это заняло 49 секунд',
'Успешно записано в hh_mult_test_5, заняло 89 секунд',
'Успешно записано в hh_mult_test_6, это заняло 71 секунду',
'Успешно записано в hh_mult_test_7, заняло 89 секунд',
'Успешно записано в hh_mult_test_8, это заняло 72 секунды',
'Успешно записано в hh_mult_test_9, это заняло 83 секунды',
'Успешно записано в hh_mult_test_10, потребовалось 77 секунд',
'Успешно записано в hh_mult_test_11, это заняло 80 секунд',
'Успешно записано в hh_mult_test_12, это заняло 65 секунд',
'Успешно записано в hh_mult_test_13, заняло 53 секунды',
'Успешно написано hh_mult_test_14, заняло 109 секунд',
'Успешно записано в hh_mult_test_15, заняло 81 секунду',
'Успешно записано в hh_mult_test_16, потребовалось 73 секунды',
'Успешно записано в hh_mult_test_17, это заняло 41 секунду',
'Успешно записано в hh_mult_test_18, потребовалось 78 секунд',
'Успешно записано в hh_mult_test_19, заняло 84 секунды',
'Успешно записано в hh_mult_test_20, это заняло 93 секунды',
'Успешно записано в hh_mult_test_21, это заняло 68 секунд',
'Успешно записано в hh_mult_test_22, потребовалось 78 секунд',
'Успешно записано в hh_mult_test_23, это заняло 48 секунд',
'Успешно записано в hh_mult_test_24, потребовалось 88 секунд',
'Успешно записано в hh_mult_test_25, потребовалось 54 секунды',
'Успешно записано в hh_mult_test_26, потребовалось 59 секунд',
'Успешно записано в hh_mult_test_27, это заняло 62 секунды',
'Успешно записано в hh_mult_test_28, это заняло 37 секунд']
Видно, что время записи каждого файла аналогично времени цикла, оба из которых составляют около 60 секунд. Но весь процесс занял всего 1 минуту и 49 секунд, что было более чем в 10 раз быстрее.
%%time
# Объем тестовых данных большой и занимать ресурсы компании без причины неправильно, поэтому их необходимо удалить.
# Но я не могу удалить их один за другим вручную, я могу просто сделать простой цикл.
for i in range(1,29):
drop_sql='''
DROP TABLE IF EXISTS temp.hh_mult_test_1{i};
'''
spark.sql(drop_sql.format(i=i)) # Удалить таблицу
CPU times: user 3.94 ms, sys: 1.96 ms, total: 5.91 ms
Wall time: 148 ms
На этом серия небольших кейсов Python подошла к концу. Эти кейсы в основном взяты из моей повседневной деятельности. Python по-прежнему имеет свое место в обработке сложных требований и повышении эффективности работы. Интересно, есть ли у вас какие-нибудь практические небольшие случаи, когда Python решает повседневные задачи?
Давайте поддержим друг друга~