![將 csv 從 CF 寫入儲存桶時: 'with open(filepath, "w") as MY_CSV:' 導致“FileNotFoundError: [Errno 2] No such file or directory:”](https://rvso.com/image/774540/%E5%B0%87%20csv%20%E5%BE%9E%20CF%20%E5%AF%AB%E5%85%A5%E5%84%B2%E5%AD%98%E6%A1%B6%E6%99%82%EF%BC%9A%20'with%20open(filepath%2C%20%22w%22)%20as%20MY_CSV%3A'%20%E5%B0%8E%E8%87%B4%E2%80%9CFileNotFoundError%3A%20%5BErrno%202%5D%20No%20such%20file%20or%20directory%3A%E2%80%9D.png)
FileNotFoundError: [Errno 2] No such file or directory
當我嘗試使用循環批量資料的 csv 編寫器將 csv 檔案寫入儲存桶時,出現此錯誤。圍繞該錯誤的 Cloud Function 日誌的完整見解:
File "/workspace/main.py", line 299, in write_to_csv_file with open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such file or directory: 'gs://MY_BUCKET/MY_CSV.csv' Function execution took 52655 ms, finished with status: 'crash' OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k ```
而且,儘管這個bucket_filepath肯定存在:我可以上傳一個空的虛擬文件並獲取其“gsutils URI”(右鍵單擊文件右側的三個點),並且bucket_filepath將看起來相同:'gs://MY_BUCKET/MY_CSV.csv'
。
我檢查了保存虛擬熊貓資料框,而不是使用它pd.to_csv
,它使用相同的bucket_filepath(!)。
因此,一定有另一個原因,可能是作者不被接受,或是with statement
開啟文件的原因。
拋出錯誤的程式碼如下。它與在本機伺服器上的正常 cron 作業中在 Google Cloud Function 之外運作的程式碼相同。我在引發錯誤的行周圍添加了兩個調試列印,但print("Right after opening the file ...")
不再顯示。也顯示了為每個批次呼叫query_execute_batch()
的子函數write_to_csv_file()
,但可能不是這裡的問題,因為在寫入開啟 csv 檔案時,錯誤已經在一開始就發生了。
requirements.txt
(然後作為模組導入):
SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1
並來自main.py
:
def query_execute_batch(connection):
"""Function for reading data from the query result into batches
:yield: each result in a loop is a batch of the query result
"""
results = execute_select_batch(connection, SQL_QUERY)
print(f"len(results): {len(results)}")
for result in results:
yield result
def write_to_csv_file(connection, filepath):
"""Write the data in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
returns: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
with open(filepath, "w") as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
writer.writeheader()
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
請注意,為了使上述腳本正常工作,我導入了該腳本gcsfs
,這使得存儲桶可讀寫。否則我可能需要一個谷歌雲端儲存對象,例如:
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
然後在該儲存桶中建立具有更多功能的文件,但這不是這裡的目的。
在下面的有效程式碼中pd.to_csv
,它使用虛擬 SQL 查詢的輸出SELECT 1
作為資料幀的輸入。這能保存到同一個bucket_filepath,當然原因可能不僅僅是pd.to_csv()
這樣,而且資料集是一個虛擬資料集,而不是來自巨大的.unicode字串SELECT query
。或者還有別的原因,我只是猜測。
if records is not None:
df = pd.DataFrame(records.fetchall())
df.columns = records.keys()
df.to_csv(filepath,
index=False,
)
datetime_now_save = datetime.now()
countrows = df.shape[0]
我想使用 csv writer 有機會使用 unicodecsv 模組以 unicode 進行寫入,並有機會使用批次。
我可能願意在 pandas 中更改為批次(loop + append
模式或chunksize
),例如將大型 Pandas 資料幀分塊寫入 CSV 文件擺脫這個儲存桶檔案路徑問題,但我寧願使用現成的程式碼(永遠不要接觸正在運行的系統)。
如何使用 csv writer 完成該 csv 的保存,以便它可以以write
mode =在儲存桶中開啟新檔案with open(filepath, "w") as outcsv:
?
給定的函數write_to_csv_file()
只是雲函數的一小部分,雲函數使用了廣泛的函數和級聯函數。我無法在這裡展示整個可重現的案例,希望可以透過經驗或更簡單的例子來回答。
答案1
解決方案令人驚訝。你必須gcsfs
如果您想寫入帶有open()
.
如果您使用pd.to_csv()
,import gcsfs
則不需要,但是gcsfs
仍需要requirements.txt
進行pd.to_csv()
工作,因此,pandasto_csv()
似乎會自動使用它。
拋開驚喜pd.to_csv()
不談,這裡是回答問題的程式碼(經過測試):
def write_to_csv_file(connection, filepath):
"""Write the QUERY result in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
return: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
# A gcsfs object is needed to open a file.
# https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
# https://gcsfs.readthedocs.io/en/latest/index.html#examples
# Side-note (Exception):
# pd.to_csv() needs neither the gcsfs object, nor its import.
# It is not used here, but it has been tested with examples.
fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
fs.ls(BUCKET_NAME)
# wb needed, else "builtins.TypeError: must be str, not bytes"
# https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
with fs.open(filepath, 'wb') as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
print("before writer.writeheader()")
writer.writeheader()
print("after writer.writeheader()")
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
邊註
不要像這樣使用 csv writer。
它需要太長的時間,而不是pd.to_csv()
參數chunksize
為 5000 只需 62 秒即可將 700k 行加載並作為 csv 存儲在存儲桶中,帶有批量寫入器的 CF 需要超過 9 分鐘,超過了超時限制。因此,我被迫使用pd.to_csv()
並將我的資料轉換為資料框。