Sagemaker Endpoint BrokenPipeError 在 DeepAR 预测时的发生

我已经使用以下代码从一个训练好的 DeepAR 模型创建了一个 SageMaker 端点:

job_name = estimator.latest_training_job.job_name
endpoint_name = sagemaker_session.endpoint_from_job(
    job_name=job_name,
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    image_uri=image_uri,
    role=role)

现在我想使用一个 test.json 数据集(66.2MB)来测试我的模型。我根据各种教程/样本笔记本创建了这个文件(与 train.json 相同,但值没有 prediction-length)。

为此,我编写了以下代码:

class DeepARPredictor(sagemaker.predictor.Predictor):
    def set_prediction_parameters(self, freq, prediction_length):
        self.freq = freq
        self.prediction_length = prediction_length
    def predict(self, ts, num_samples=100, quantiles=["0.1", "0.5", "0.9"]):
        prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
        req = self.__encode_request(ts, num_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
        return self.__decode_response(res, prediction_times)
    def __encode_request(self, ts, num_samples, quantiles):
        instances = [{"start": str(ts[k].index[0]), "target": list(ts[k])} for k in range(len(ts))]
        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles"],
            "quantiles": quantiles,
        }
        http_request_data = {"instances": instances, "configuration": configuration}
        return json.dumps(http_request_data).encode( "utf-8")
    def __decode_response(self, response, prediction_times):
        response_data = json.loads(response.decode("utf-8"))
        list_of_df = []
        for k in range(len(prediction_times)):
            prediction_index = pd.date_range(
                start=prediction_times[k], freq=self.freq, periods=self.prediction_length
            )
            list_of_df.append(
                pd.DataFrame(data=response_data["predictions"][k]["quantiles"], index=prediction_index)
            )
        return list_of_df

但是在运行以下代码块后:

predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
predictor.set_prediction_parameters(freq, prediction_length)
list_of_df = predictor.predict(time_series_training)

我得到了一个 BrokenPipeError:

---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    676                 headers=headers,
--> 677                 chunked=chunked,
    678             )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    391         else:
--> 392             conn.request(method, url, **httplib_request_kw)
    393
~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
   1261         """Send a complete request to the server."""
-> 1262         self._send_request(method, url, body, headers, encode_chunked)
   1263
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
     92         rval = super(AWSConnection, self)._send_request(
---> 93             method, url, body, headers, *args, **kwargs)
     94         self._expect_header_set = False
~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1307             body = _encode(body, 'body')
-> 1308         self.endheaders(body, encode_chunked=encode_chunked)
   1309
~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
   1256             raise CannotSendHeader()
-> 1257         self._send_output(message_body, encode_chunked=encode_chunked)
   1258
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
    119             message_body = None
--> 120         self.send(msg)
    121         if self._expect_header_set:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
    203             return
--> 204         return super(AWSConnection, self).send(str)
    205
~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
    995         try:
--> 996             self.sock.sendall(data)
    997         except TypeError:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
    974                 while count < amount:
--> 975                     v = self.send(byte_view[count:])
    976                     count += v
~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
    943                     self.__class__)
--> 944             return self._sslobj.write(data)
    945         else:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
    641         """
--> 642         return self._sslobj.write(data)
    643
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
ProtocolError                             Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
    319                 decode_content=False,
--> 320                 chunked=self._chunked(request.headers),
    321             )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    726             retries = retries.increment(
--> 727                 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
    728             )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
    378             # Disabled, indicate to re-raise the error.
--> 379             raise six.reraise(type(error), error, _stacktrace)
    380
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/packages/six.py in reraise(tp, value, tb)
    733             if value.__traceback__ is not tb:
--> 734                 raise value.with_traceback(tb)
    735             raise value
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    676                 headers=headers,
--> 677                 chunked=chunked,
    678             )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    391         else:
--> 392             conn.request(method, url, **httplib_request_kw)
    393
~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
   1261         """Send a complete request to the server."""
-> 1262         self._send_request(method, url, body, headers, encode_chunked)
   1263
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
     92         rval = super(AWSConnection, self)._send_request(
---> 93             method, url, body, headers, *args, **kwargs)
     94         self._expect_header_set = False
~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1307             body = _encode(body, 'body')
-> 1308         self.endheaders(body, encode_chunked=encode_chunked)
   1309
~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
   1256             raise CannotSendHeader()
-> 1257         self._send_output(message_body, encode_chunked=encode_chunked)
   1258
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
    119             message_body = None
--> 120         self.send(msg)
    121         if self._expect_header_set:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
    203             return
--> 204         return super(AWSConnection, self).send(str)
    205
~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
    995         try:
--> 996             self.sock.sendall(data)
    997         except TypeError:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
    974                 while count < amount:
--> 975                     v = self.send(byte_view[count:])
    976                     count += v
~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
    943                     self.__class__)
--> 944             return self._sslobj.write(data)
    945         else:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
    641         """
--> 642         return self._sslobj.write(data)
    643
ProtocolError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
During handling of the above exception, another exception occurred:
ConnectionClosedError                     Traceback (most recent call last)
<ipython-input-14-95dda20e8a70> in <module>
      1 predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
      2 predictor.set_prediction_parameters(freq, prediction_length)
----> 3 list_of_df = predictor.predict(time_series_training)
<ipython-input-13-a0fbac2b9b07> in predict(self, ts, num_samples, quantiles)
      7         prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
      8         req = self.__encode_request(ts, num_samples, quantiles)
----> 9         res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
     10         return self.__decode_response(res, prediction_times)
     11
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/predictor.py in predict(self, data, initial_args, target_model, target_variant)
    123
    124         request_args = self._create_request_args(data, initial_args, target_model, target_variant)
--> 125         response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)
    126         return self._handle_response(response)
    127
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    355                     "%s() only accepts keyword arguments." % py_operation_name)
    356             # The "self" in this scope is referring to the BaseClient.
--> 357             return self._make_api_call(operation_name, kwargs)
    358
    359         _api_call.__name__ = str(py_operation_name)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    661         else:
    662             http, parsed_response = self._make_request(
--> 663                 operation_model, request_dict, request_context)
    664
    665         self.meta.events.emit(
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_request(self, operation_model, request_dict, request_context)
    680     def _make_request(self, operation_model, request_dict, request_context):
    681         try:
--> 682             return self._endpoint.make_request(operation_model, request_dict)
    683         except Exception as e:
    684             self.meta.events.emit(
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
    100         logger.debug("Making request for %s with params: %s",
    101                      operation_model, request_dict)
--> 102         return self._send_request(request_dict, operation_model)
    103
    104     def create_request(self, params, operation_model=None):
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
    135             request, operation_model, context)
    136         while self._needs_retry(attempts, operation_model, request_dict,
--> 137                                 success_response, exception):
    138             attempts += 1
    139             # If there is a stream associated with the request, we need
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _needs_retry(self, attempts, operation_model, request_dict, response, caught_exception)
    254             event_name, response=response, endpoint=self,
    255             operation=operation_model, attempts=attempts,
--> 256             caught_exception=caught_exception, request_dict=request_dict)
    257         handler_response = first_non_none_response(responses)
    258         if handler_response is None:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    354     def emit(self, event_name, **kwargs):
    355         aliased_event_name = self._alias_event_name(event_name)
--> 356         return self._emitter.emit(aliased_event_name, **kwargs)
    357
    358     def emit_until_response(self, event_name, **kwargs):
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    226                  handlers.
    227         """
--> 228         return self._emit(event_name, kwargs)
    229
    230     def emit_until_response(self, event_name, **kwargs):
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
    209         for handler in handlers_to_call:
    210             logger.debug('Event %s: calling handler %s', event_name, handler)
--> 211             response = handler(**kwargs)
    212             responses.append((handler, response))
    213             if stop_on_response and response is not None:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempts, response, caught_exception, **kwargs)
    181
    182         """
--> 183         if self._checker(attempts, response, caught_exception):
    184             result = self._action(attempts=attempts)
    185             logger.debug("Retry needed, action of: %s", result)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    249     def __call__(self, attempt_number, response, caught_exception):
    250         should_retry = self._should_retry(attempt_number, response,
--> 251                                           caught_exception)
    252         if should_retry:
    253             if attempt_number >= self._max_attempts:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _should_retry(self, attempt_number, response, caught_exception)
    275             # If we've exceeded the max attempts we just let the exception
    276             # propogate if one has occurred.
--> 277             return self._checker(attempt_number, response, caught_exception)
    278
    279
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    315         for checker in self._checkers:
    316             checker_response = checker(attempt_number, response,
--> 317                                        caught_exception)
    318             if checker_response:
    319                 return checker_response
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    221         elif caught_exception is not None:
    222             return self._check_caught_exception(
--> 223                 attempt_number, caught_exception)
    224         else:
    225             raise ValueError("Both response and caught_exception are None.")
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _check_caught_exception(self, attempt_number, caught_exception)
    357         # the MaxAttemptsDecorator is not interested in retrying the exception
    358         # then this exception just propogates out past the retry code.
--> 359         raise caught_exception
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _do_get_response(self, request, operation_model)
    198             http_response = first_non_none_response(responses)
    199             if http_response is None:
--> 200                 http_response = self._send(request)
    201         except HTTPClientError as e:
    202             return (None, e)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send(self, request)
    267
    268     def _send(self, request):
--> 269         return self.http_session.send(request)
    270
    271
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
    349                 error=e,
    350                 request=request,
--> 351                 endpoint_url=request.url
    352             )
    353         except Exception as e:
ConnectionClosedError: Connection was closed before we received a valid response from endpoint URL

有人知道这是为什么吗?


回答:

我认为 Tarun 可能走在正确的道路上。你遇到的 BrokenPipeError 是在连接突然关闭时抛出的。参见 Python 文档中关于 BrokenPipeError 的说明。SageMaker 端点可能在你超过 5MB 的限制时立即断开连接。我建议你尝试使用一个较小的数据集。另外,你发送的数据可能会因为 sagemaker.tensorflow.model.TensorFlowPredictor 如何编码数据而变大,根据 这个评论 关于类似的问题的说明。

如果这不起作用,我还看到有几个人在他们的网络上遇到了一些问题。特别是防火墙/杀毒软件(例如 这个评论)或网络超时。

希望这能指引你找到正确的方向。

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注