我是PySpark的新手,我从API中获取了一系列JSON,每个JSON对象具有相同的模式(键值对)。像这样
[ {'count': 308, 'next': 'some_url', 'previous': None, 'results': [{'assigned_to': 43, 'category': 'Unused', 'comments': None, 'completed_ts': None, 'created': '2019-05-27T05:14:22.306843Z', 'description': 'Pollution', 'display_name': {'admin': False, 'business_name': 'Test Business', 'contact_number': 'some_number', 'dob': None, 'email': 'some_mail', 'emp_id': None, 'first_name': 'Alisha'}}]}, {'count': 309, 'next': 'some_url', 'previous': None, 'results': [{'assigned_to': 44, 'category': 'Unused', 'comments': None, 'completed_ts': None, 'created': '2019-05-27T05:14:22.306843Z', 'description': 'Pollution', 'display_name': {'admin': False, 'business_name': 'Test Business', 'contact_number': 'some_number', 'dob': None, 'email': 'some_mail', 'emp_id': None, 'first_name': 'Ali'}}]},......}]
如果这些是单独的JSON文件,我会使用以下方式创建数据框
df =spark.read.json('myfile.json')
然后将所有数据框合并成一个。我在尝试直接从列表转换为数据框时遇到了问题。我使用了这个
from pyspark.sql import SparkSessionspark= SparkSession.builder.appName("Basics").getOrCreate()sc= spark.sparkContextdf = pyspark.sql.SQLContext(sc.parallelize(data_list))`
这会给我AttributeError: 'RDD' object has no attribute '_jsc'
回答:
我没有找到直接解决您问题的答案。但这个解决方案有效,
import jsonimport astdf = sc.wholeTextFiles(path).map(lambda x:ast.literal_eval(x[1]))\ .map(lambda x: json.dumps(x))df = spark.read.json(df)
这会给您如下输出,
+-----+--------+--------+--------------------+|count| next|previous| results|+-----+--------+--------+--------------------+| 308|some_url| null|[[43,Unused,null,...|| 309|some_url| null|[[44,Unused,null,...|+-----+--------+--------+--------------------+
编辑:如果它在一个变量中,您只需要做的是,
import jsondf = sc.parallelize(data).map(lambda x: json.dumps(x))df = spark.read.json(df)