我每天在Google Cloud Scheduler上训练一个校准分类器,大约需要5分钟运行时间。我的Python脚本会接收当天的最新数据,并将其与原始数据拼接,然后模型进行训练并将pickle文件保存到Cloud Storage上。我现在遇到的问题是,如果运行时间超过5分钟(这迟早会发生),就会出现上游请求超时错误。
我认为这是因为模型训练时间增加了。我想到的一个解决方案是仅用新数据训练模型,并更新pickle文件中原始模型的权重。然而,我不确定这是否可行。
以下是我在调度器上运行的函数:
def train_model(): users, tasks, tags, task_tags, task_user, boards = connect_postgres() ##loading the data from a postgres function storage_client = storage.Client() bucket = storage_client.get_bucket('my-bucket') blob = bucket.blob('original_data.pkl') pickle_in0 = blob.download_as_string() data = pickle.loads(pickle_in0) tasks = tasks.rename(columns={'id': 'task_id', 'name': 'task_name'}) # Joining tasks and task_user_assigns tables tasks = tasks[tasks.task_name.isnull() == False] task_user = task_user[['id', 'task_id', 'user_id']].rename(columns={'id': 'task_user_id'}) task_data = tasks.merge(task_user, on='task_id', how='left') # Joining users with the task_data users = users[['id', 'email']].rename(columns={'id': 'user_id'}) users_tasks = task_data.merge(users, on='user_id', how='left') users_tasks = users_tasks[users_tasks.user_id.isnull() == False].reset_index(drop=True) # Joining boards table to user_tasks boards = boards[['id', 'name']].rename(columns={'id': 'board_id', 'name': 'board_name'}) users_board = users_tasks.merge(boards, on='board_id', how='left').reset_index(drop=True) # Data Cleaning translator = Translator() # This is to translate if the tasks are not in English users_board["task_trans"] = users_board["task_name"].map(lambda x: translator.translate(x, dest="en").text) users_board['task_trans'] = users_board['task_trans'].apply(lambda x: remove_emoji(x)) #This calls a function to remove Emoticons from text users_board['task_trans'] = users_board['task_trans'].apply(lambda x: remove_punct(x)) #This calls a function to remove punctuations from text users_board = users_board[['task_id', 'email', 'board_id', 'user_id', 'task_trans']] data1 = pd.concat([data, users_board], axis=0) df1 = data1.copy X = df1.task_trans #all the observations y = df1.user_id #all the lables print(y.nunique()) #FROM HERE ON, THE TRAINING SCRIPT BEGINS count_vect = CountVectorizer() X_train_counts = count_vect.fit_transform(X) tf_transformer = TfidfTransformer().fit(X_train_counts) X_train_transformed = tf_transformer.transform(X_train_counts) print('model 1 done') labels = LabelEncoder() y_train_labels_fit = labels.fit(y) y_train_lables_trf = labels.transform(y) linear_svc = LinearSVC() clf = linear_svc.fit(X_train_transformed, y_train_lables_trf) print('model 2 done') calibrated_svc = CalibratedClassifierCV(base_estimator=linear_svc, cv="prefit") calibrated_svc.fit(X_train_transformed, y_train_lables_trf) print('model 3 done') # SAVING THE MODELS ON GOOGLE CLOUD STORAGE # storage_client = storage.Client() fs = gcsfs.GCSFileSystem(project='my-project') filename = '~path/svc.sav' pickle.dump(calibrated_svc, fs.open(filename, 'wb')) filename = '~path/count_vectorizer.sav' pickle.dump(count_vect, fs.open(filename, 'wb')) filename = '~path/tfidf_vectorizer.sav' pickle.dump(tf_transformer, fs.open(filename, 'wb')) blob = bucket.blob('data.pkl') pickle_out = pickle.dumps(df1) blob.upload_from_string(pickle_out) return "success"
有什么办法可以实现这个目标吗?或者有什么其他策略可以解决这个问题?
回答:
我找不到更新pickle文件权重的方法,最终通过增加Cloud Run的超时参数至超过训练时间暂时解决了这个问题。