我正在使用Mlflow作为工作编排工具。我有一个机器学习流程。在这个流程中,我有实时数据。我使用Apache Kafka来监听这些数据。我还做了以下操作:每当有250条消息到达这个主题时,我会收集它们,并将这些消息添加到我之前的数据中。之后,我的训练函数会被触发。这样,每250条新数据,我就可以进行新的训练。使用Mlflow,我可以展示训练模型的结果、指标和其他任何参数。但是,在训练发生一次之后,第二次训练不会发生,并且抛出了我在标题中显示的错误。这是我的消费者代码:
topic_name = 'twitterdata'
train_every = 250
def consume_tweets():
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9093'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms=5000,
fetch_max_bytes=128,
max_poll_records=100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
tweet_counter = 0
for message in consumer:
tweets = json.loads(json.dumps(message.value))
# print(tweets['text'])
tweet_sentiment = make_prediction(tweets['text'])
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0
else:
tweet_counter += 1
publish_prediction(tweet_sentiment, tweets['text'])
这是我的train.py文件:
train_tweets = pd.read_csv(DATA_PATH)
# train_tweets = train_tweets[:20000]
tweets = train_tweets.tweet.values
labels = train_tweets.label.values
# Log data params
mlflow.log_param('input_rows', train_tweets.shape[0])
# Do preprocessing and return vectorizer with it
vectorizer, processed_features = embedding(tweets)
# Saving vectorizer
save_vectorizer(vectorizer)
# Split data
X_train, X_test, y_train, y_test = train_test_split(processed_features, labels, test_size=0.2, random_state=0)
# Handle imbalanced data by using 'Smote' and log to Mlflow
smote = SMOTE('minority')
mlflow.log_param("over-sampling", smote)
X_train, y_train = smote.fit_sample(X_train, y_train)
# text_classifier = MultinomialNB()
text_classifier = LogisticRegression(max_iter=10000)
text_classifier.fit(X_train, y_train)
predictions = text_classifier.predict(X_test)
# Model metrics
(rmse, mae, r2) = eval_metrics(y_test, predictions)
mlflow.log_param('os-row-Xtrain', X_train.shape[0])
mlflow.log_param('os-row-ytrain', y_train.shape[0])
mlflow.log_param("model_name", text_classifier)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.log_metric('acc_score', accuracy_score(y_test, predictions))
mlflow.sklearn.log_model(text_classifier, "model")
我无法解决这个问题。Mlflow是较新的工具,因此关于Mlflow的问题和示例非常少。
回答:
我认为您需要为每批新数据启动一个新的Mlflow“运行”,这样您的参数就可以为每次新的训练独立记录。
因此,请在您的消费者中尝试以下操作:
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
with mlflow.start_run() as mlrun:
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0