PySpark SQL поставлять read.json("path")
Преобразование одной или нескольких строк (многострочных) в JSON Файл, прочитанный в PySpark DataFrame и write.json("path")
сохрани или напиши JSON Файловые функции. В этом уроке вы узнаете, как читать один файл, несколько файлов и все файлы в каталоге в DataFrame ииспользовать Python Примером может быть DataFrame напиши ответ JSON документ.
Уведомление: готов к использованию из коробки PySpark API Поддержка будет JSON файлы и другие форматы файлов, прочитанные в PySpark DataFrame середина.
использовать read.json("path")
или read.format("json").load("path")
Метод принимает путь к файлу в качестве параметра, который может быть JSON Чтение файлов PySpark DataFrame。
В отличие от чтения CSV, источник данных JSON из входного файла по умолчанию выводит схему.
здесьиспользоватьиз zipcodes.json Файлы можно получить с GitHub Скачать проект.
Портал: https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.json
# Read JSON file into dataframe
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()
когдаиспользовать format("json")
Метод, вы также можете указать источник данных по его полному имени, как показано ниже.
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json') \
.load("PyDataStudio/zipcodes.json")
PySpark JSON данныеисточниксуществоватьдругой Параметрысерединапоставлятьнесколькочитатьдокументиз Параметры,использоватьmultiline
Параметрычитатьдисперсиясуществоватьполигамный JSON По умолчанию многострочный параметр имеет значение документ. false。
Ниже приведен входной файл, который мы хотим прочитать. Этот же файл также можно найти на Github.
Портал: https://github.com/spark-examples/pyspark-examples/blob/master/resources/multiline-zipcode.json
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
использоватьread.option("multiline","true")
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
.json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()
Достаточно хорошоиспользоватьread.json()
методотразные путичитать Несколько JSON файлы, просто передайте все имена файлов с полными путями, разделенными запятыми, например.
# Read multiple files
df2 = spark.read.json(
['resources/zipcode1.json',
'resources/zipcode2.json'])
df2.show()
Только Волякаталог какjson()
методизпуть, пройденный кметод,Мы можем получить весь JSON в каталоге Vola Файл, прочитанный в середине DataFrame.
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()
PySpark Schema определяет структуру данных, другими словами, это DataFrame структура. Пи Спарк SQL поставлять StructType и StructField Классы задаются программно DataFrame структура.
если известно заранеедокументиз Архитектураи Не хочуиспользоватьinferSchema
Параметрыуказать Списокитип,пожалуйстаиспользоватьобозначениеиз Настроить Списокschemaииспользоватьschema
Параметрытип。
использовать PySpark StructType Настройка создания классов Схема, давайте запустим этот класс ииспользуем метод add, чтобы добавить в него столбцы, предоставляя имя столбца, тип данных и параметры, допускающие значение NULL.
# Define custom schema
schema = StructType([
StructField("RecordNumber",IntegerType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State",StringType(),True),
StructField("LocationType",StringType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long",DoubleType(),True),
StructField("Xaxis",IntegerType(),True),
StructField("Yaxis",DoubleType(),True),
StructField("Zaxis",DoubleType(),True),
StructField("WorldRegion",StringType(),True),
StructField("Country",StringType(),True),
StructField("LocationText",StringType(),True),
StructField("Location",StringType(),True),
StructField("Decommisioned",BooleanType(),True),
StructField("TaxReturnsFiled",StringType(),True),
StructField("EstimatedPopulation",IntegerType(),True),
StructField("TotalWages",IntegerType(),True),
StructField("Notes",StringType(),True)
])
df_with_schema = spark.read.schema(schema) \
.json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()
PySpark SQL Также это создает своего рода чтение JSON документизметод,методдаиспользовать spark.sqlContext.sql("будет JSON Загрузить во временное представление") Создать временное представление непосредственно из прочитанного файла
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" +
" (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode").show()
использовать nullValues
вариант, вы можете JSON Строка указывается как нулевой. Например, если вы хотите рассмотреть значение 1900-01-01
столбец даты, затем в DataFrame установлен на null。
Параметры dateFormat
для настройки входа DateType и TimestampType Формат столбца Параметры. Поддержать всех java.text.SimpleDateFormat
Формат.
Уведомление:В дополнение к вышесказанному Параметрыснаружи,PySpark JSON набор данных также поддерживает множество других параметров.
от JSON Создание файла PySpark DataFrame После этого вы можете подать заявку DataFrame поддерживатьизвсе Конвертироватьидействовать。
существовать DataFrame начальствоиспользовать PySpark DataFrameWriter объект write
Написание метода JSON документ.
df2.write.json("/PyDataStudio/spark_output/zipcodes.json")
существоватьписать JSON При документировании вы можете использовать несколько параметров. nullValue
,dateFormat
PySpark DataFrameWriter Есть другой способ mode()
указать SaveMode;этотметодиз Принятые параметрыoverwrite
, append
, ignore
, errorifexists
.
overwrite
– Режим используется для перезаписи существующих файлов.append
– Воляданные добавлены в существующий документignore
– Игнорировать операцию записи, если документ уже существует.errorifexists
или error
– Это параметр по умолчанию, когда документ уже сохранен, он возвращает ошибку. df2.write.mode('Overwrite') \
.json("/PyDataStudio/spark_output/zipcodes.json")
Этот пример также работаетсуществоватьGitHub PySpark Пример проекта доступен для справки.
# https://github.com/spark-examples/pyspark-examples/blob/master/pyspark-read-json.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
# Read JSON file into dataframe
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
.json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()
#Read multiple files
df2 = spark.read.json(
['PyDataStudio/zipcode2.json','PyDataStudio/zipcode1.json'])
df2.show()
#Read All JSON files from a directory
df3 = spark.read.json("PyDataStudio/*.json")
df3.show()
# Define custom schema
schema = StructType([
StructField("RecordNumber",IntegerType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State",StringType(),True),
StructField("LocationType",StringType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long",DoubleType(),True),
StructField("Xaxis",IntegerType(),True),
StructField("Yaxis",DoubleType(),True),
StructField("Zaxis",DoubleType(),True),
StructField("WorldRegion",StringType(),True),
StructField("Country",StringType(),True),
StructField("LocationText",StringType(),True),
StructField("Location",StringType(),True),
StructField("Decommisioned",BooleanType(),True),
StructField("TaxReturnsFiled",StringType(),True),
StructField("EstimatedPopulation",IntegerType(),True),
StructField("TotalWages",IntegerType(),True),
StructField("Notes",StringType(),True)
])
df_with_schema = spark.read.schema(schema) \
.json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()
# Create a table from Parquet File
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode3 USING json OPTIONS" +
" (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode3").show()
# PySpark write Parquet File
df2.write.mode('Overwrite').json("/PyDataStudio/spark_output/zipcodes.json")
Связанное чтение: PySpark Читай и пиши CSV файл в DataFrame