pyspark: 创建数据框时’RDD’不可调用

我正在尝试从最终用户通过REST API提供的参数中创建一个数据框,以便从我的模型中获取预测结果。但是在创建数据框时遇到了错误。

**方法一中的错误**(使用值的元组和列的列表)

使用Spark的默认log4j配置文件: org/apache/spark/log4j-defaults.properties
将默认日志级别设置为"WARN"。
要调整日志级别,请使用sc.setLogLevel(newLevel)。对于SparkR,使用setLogLevel(newLevel)。
18/02/10 13:01:13 WARN NativeCodeLoader: 无法为您的平台加载本地hadoop库... 在适用的地方使用内置的Java类
18/02/10 13:01:14 WARN Utils: 您的hostname,pyspark-VirtualBox解析为一个回环地址: 127.0.1.1; 使用10.0.2.15代替(在接口enp0s3上)
18/02/10 13:01:14 WARN Utils: 如果需要绑定到另一个地址,请设置SPARK_LOCAL_IP
18/02/10 13:01:17 WARN Utils: 服务'SparkUI'无法绑定到端口4040。尝试端口4041。
### 元组是  [(800, 0, 0.3048, 71.3, 0.0026634)]
### schema --> struct<Freq_Hz:int,displ_thick_m:double,Chord_m:double,V_inf_mps:double,AoA_Deg:int>
### session --> <pyspark.sql.conf.RuntimeConfig object at 0x7f1b68086860>
### data frame --> MapPartitionsRDD[8] at toJavaRDD at NativeMethodAccessorImpl.java:0
127.0.0.1 - - [10/Feb/2018 13:01:37] "GET /test HTTP/1.1" 500 -
Traceback (most recent call last):
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1997, in __call__
    return self.wsgi_app(environ, start_response)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1985, in wsgi_app
    response = self.handle_exception(e)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1540, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1615, in full_dispatch_request
    return self.finalize_request(rv)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1630, in finalize_request
    response = self.make_response(rv)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1740, in make_response
    rv = self.response_class.force_type(rv, request.environ)
  File "/home/pyspark/.local/lib/python3.5/site-packages/werkzeug/wrappers.py", line 921, in force_type
    response = BaseResponse(*_run_wsgi_app(response, environ))
  File "/home/pyspark/.local/lib/python3.5/site-packages/werkzeug/wrappers.py", line 59, in _run_wsgi_app
    return _run_wsgi_app(*args)
  File "/home/pyspark/.local/lib/python3.5/site-packages/werkzeug/test.py", line 923, in run_wsgi_app
    app_rv = app(environ, start_response)
TypeError: 'RDD' object is not callable

*方法二中的错误*(使用元组和schema)

使用Spark的默认log4j配置文件: org/apache/spark/log4j-defaults.properties
将默认日志级别设置为"WARN"。
要调整日志级别,请使用sc.setLogLevel(newLevel)。对于SparkR,使用setLogLevel(newLevel)。
18/02/10 12:56:47 WARN NativeCodeLoader: 无法为您的平台加载本地hadoop库... 在适用的地方使用内置的Java类
18/02/10 12:56:48 WARN Utils: 您的hostname,pyspark-VirtualBox解析为一个回环地址: 127.0.1.1; 使用10.0.2.15代替(在接口enp0s3上)
18/02/10 12:56:48 WARN Utils: 如果需要绑定到另一个地址,请设置SPARK_LOCAL_IP
18/02/10 12:56:51 WARN Utils: 服务'SparkUI'无法绑定到端口4040。尝试端口4041。
### 元组是  [(800, 0, 0.3048, 71.3, 0.0026634)]
### schema --> struct<displ_thick_m:double,Chord_m:double,Freq_Hz:int,AoA_Deg:int,V_inf_mps:double>
### session --> <pyspark.sql.conf.RuntimeConfig object at 0x7efd4df9e860>
127.0.0.1 - - [10/Feb/2018 12:56:53] "GET /test HTTP/1.1" 500 -
Traceback (most recent call last):
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1997, in __call__
    return self.wsgi_app(environ, start_response)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1985, in wsgi_app
    response = self.handle_exception(e)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1540, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1614, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1517, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1612, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/pyspark/.local/lib/python3.5/site-packages/flask/app.py", line 1598, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/pyspark/Desktop/building_py_rec/lin_reg/server.py", line 48, in test
    df = session.createDataFrame(tup, schema)
  File "/home/pyspark/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 522, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/home/pyspark/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 383, in _createFromLocal
    data = list(data)
  File "/home/pyspark/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 505, in prepare
    verify_func(obj, schema)
  File "/home/pyspark/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 1360, in _verify_type
    _verify_type(v, f.dataType, f.nullable)
  File "/home/pyspark/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 1324, in _verify_type
    raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj)))
TypeError: DoubleType can not accept object 800 in type <class 'int'>

在这里,我理解元组中的值提供给createDataframe的顺序与schema中的顺序不匹配。因此出现了TypeError错误。

相关代码

@app.route('/test')
def test():
    # 创建Spark会话
    session = SparkSession.builder.appName('lin_reg_api').getOrCreate()
    # 方法一
    tup = [(800,0,0.3048,71.3,0.0026634)]
    cols = ["Freq_Hz", "AoA_Deg", "Chord_m", "V_inf_mps", "displ_thick_m"]
    print(' ### 元组是 ', tup)
    #方法二
    schema = StructType({
        StructField("Freq_Hz", IntegerType(), False),
        StructField("AoA_Deg", IntegerType(), False),
        StructField("Chord_m", DoubleType(), False),
        StructField("V_inf_mps", DoubleType(), False),
        StructField("displ_thick_m", DoubleType(), False),
    })
    print(' ### schema -->', schema.simpleString())
    # session = linReg.getSession() # 返回Spark会话
    print(' ### session -->', session.conf)
    # 方法一
    #df = session.createDataFrame(tup, cols)
    # 方法二
    df = session.createDataFrame(tup, schema)
    print(' ### data frame -->', df.toJSON())
    return df.toJSON()

我想了解如何使这两种方法都能正常工作。


回答:

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注