![CF에서 버킷으로 csv를 쓸 때: 'with open(filepath, "w") as MY_CSV:'는 "FileNotFoundError: [Errno 2] 해당 파일 또는 디렉터리가 없습니다:"가 발생합니다.](https://rvso.com/image/774540/CF%EC%97%90%EC%84%9C%20%EB%B2%84%ED%82%B7%EC%9C%BC%EB%A1%9C%20csv%EB%A5%BC%20%EC%93%B8%20%EB%95%8C%3A%20'with%20open(filepath%2C%20%22w%22)%20as%20MY_CSV%3A'%EB%8A%94%20%22FileNotFoundError%3A%20%5BErrno%202%5D%20%ED%95%B4%EB%8B%B9%20%ED%8C%8C%EC%9D%BC%20%EB%98%90%EB%8A%94%20%EB%94%94%EB%A0%89%ED%84%B0%EB%A6%AC%EA%B0%80%20%EC%97%86%EC%8A%B5%EB%8B%88%EB%8B%A4%3A%22%EA%B0%80%20%EB%B0%9C%EC%83%9D%ED%95%A9%EB%8B%88%EB%8B%A4..png)
FileNotFoundError: [Errno 2] No such file or directory
일괄 데이터를 반복하는 csv 기록기를 사용하여 버킷에 csv 파일을 쓰려고 하면 이 오류가 발생합니다 . 해당 오류에 대한 Cloud 함수 로그에 대한 전체 정보는 다음과 같습니다.
File "/workspace/main.py", line 299, in write_to_csv_file with open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such file or directory: 'gs://MY_BUCKET/MY_CSV.csv' Function execution took 52655 ms, finished with status: 'crash' OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k ```
그리고 이 bucket_filepath는 확실히 존재하지만, 빈 더미 파일을 업로드하고 "gsutils URI"(파일 오른쪽에 있는 세 개의 점을 마우스 오른쪽 버튼으로 클릭)를 가져올 수 있으며 bucket_filepath는 동일하게 보입니다 'gs://MY_BUCKET/MY_CSV.csv'
.
대신에 더미 팬더 데이터 프레임을 저장하는 것을 확인했고 pd.to_csv
동일한 bucket_filepath(!)로 작동했습니다.
with statement
따라서 작성자가 승인되지 않거나 파일을 여는 등 의 다른 이유가 있어야 합니다 .
오류를 발생시키는 코드는 다음과 같습니다. 로컬 서버의 일반적인 크론 작업에서 Google Cloud 함수 외부에서 작동하는 것과 동일한 코드를 사용합니다. 오류가 발생하는 줄 주위에 두 개의 디버그 인쇄를 추가했는데 print("Right after opening the file ...")
더 이상 표시되지 않습니다. 각 배치에 대해 호출 query_execute_batch()
하는 하위 기능 write_to_csv_file()
도 표시되지만 csv 파일을 쓰기-열 때 시작 부분에 이미 오류가 발생하므로 여기서는 문제가 아닐 가능성이 높습니다.
requirements.txt
(그런 다음 모듈로 가져옵니다):
SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1
그리고 다음에서 main.py
:
def query_execute_batch(connection):
"""Function for reading data from the query result into batches
:yield: each result in a loop is a batch of the query result
"""
results = execute_select_batch(connection, SQL_QUERY)
print(f"len(results): {len(results)}")
for result in results:
yield result
def write_to_csv_file(connection, filepath):
"""Write the data in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
returns: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
with open(filepath, "w") as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
writer.writeheader()
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
위 스크립트가 작동하려면 gcsfs
버킷을 읽기-쓰기 가능하게 만드는 가져오기를 수행해야 합니다. 그렇지 않으면 다음과 같은 Google 클라우드 저장소 객체가 필요할 것입니다.
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
그런 다음 추가 기능을 사용하여 해당 버킷에 파일을 만듭니다. 그러나 여기서는 이것이 목표가 아닙니다.
다음에서 pd.to_csv
작동하는 코드는 더미 SQL 쿼리의 출력을 SELECT 1
데이터 프레임의 입력으로 사용합니다. 이것~할 수 있다동일한 bucket_filepath에 저장되어야 합니다. 물론 이유는 단지 pd.to_csv()
그런 것이 아니라 데이터 세트가 거대한 SELECT query
. 아니면 다른 이유가 있는 것 같아요.
if records is not None:
df = pd.DataFrame(records.fetchall())
df.columns = records.keys()
df.to_csv(filepath,
index=False,
)
datetime_now_save = datetime.now()
countrows = df.shape[0]
unicodecsv 모듈을 사용하여 유니코드로 작성할 수 있는 기회와 배치를 사용할 수 있는 기회를 얻기 위해 csv 기록기를 사용하고 싶습니다.
다음과 같이 팬더에서 배치( loop + append
모드 또는 ) 로 변경할 의향이 있을 수 있습니다.chunksize
대규모 Pandas Dataframe을 CSV 파일에 청크로 작성이 버킷 파일 경로 문제를 해결하려면 준비된 코드를 사용하고 싶습니다(실행 중인 시스템을 건드리지 마세요).
write
모드 = 에서 버킷에 새 파일을 열 수 있도록 csv 기록기로 해당 csv를 저장하려면 어떻게 해야 합니까 with open(filepath, "w") as outcsv:
?
주어진 함수는 write_to_csv_file()
광범위한 함수와 계단식 함수를 사용하는 Cloud 함수의 아주 작은 부분일 뿐입니다. 여기서 재현 가능한 사례 전체를 보여줄 수는 없으며 경험이나 더 쉬운 예를 통해 답을 얻을 수 있기를 바랍니다.
답변1
해결책은 놀랍습니다. 너~ 해야 하다.gcsfs
open()
을 사용하는 경우에는 pd.to_csv()
필요 import gcsfs
하지 않지만gcsfs
작업을 requirements.txt
수행 하려면 여전히 필요합니다.pd.to_csv()
, 따라서 팬더는 to_csv()
자동으로 사용하는 것 같습니다.
놀랍게도 pd.to_csv()
다음은 질문에 답하는 코드입니다(테스트됨).
def write_to_csv_file(connection, filepath):
"""Write the QUERY result in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
return: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
# A gcsfs object is needed to open a file.
# https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
# https://gcsfs.readthedocs.io/en/latest/index.html#examples
# Side-note (Exception):
# pd.to_csv() needs neither the gcsfs object, nor its import.
# It is not used here, but it has been tested with examples.
fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
fs.ls(BUCKET_NAME)
# wb needed, else "builtins.TypeError: must be str, not bytes"
# https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
with fs.open(filepath, 'wb') as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
print("before writer.writeheader()")
writer.writeheader()
print("after writer.writeheader()")
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
사이드 노트
이와 같이 CSV 작성기를 사용하지 마십시오.
pd.to_csv()
700,000개 행을 로드하고 버킷에 CSV로 저장하는 데 62초만 필요한 5000 매개변수를 사용하는 대신 chunksize
배치 작성자가 있는 CF는 9분 이상 소요됩니다. 시간 초과 제한. 따라서 대신 사용 pd.to_csv()
하고 데이터를 데이터 프레임으로 변환해야 합니다.