CF에서 버킷으로 csv를 쓸 때: 'with open(filepath, "w") as MY_CSV:'는 "FileNotFoundError: [Errno 2] 해당 파일 또는 디렉터리가 없습니다:"가 발생합니다.

CF에서 버킷으로 csv를 쓸 때: 'with open(filepath, "w") as MY_CSV:'는 "FileNotFoundError: [Errno 2] 해당 파일 또는 디렉터리가 없습니다:"가 발생합니다.

FileNotFoundError: [Errno 2] No such file or directory일괄 데이터를 반복하는 csv 기록기를 사용하여 버킷에 csv 파일을 쓰려고 하면 이 오류가 발생합니다 . 해당 오류에 대한 Cloud 함수 로그에 대한 전체 정보는 다음과 같습니다.


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

그리고 이 bucket_filepath는 확실히 존재하지만, 빈 더미 파일을 업로드하고 "gsutils URI"(파일 오른쪽에 있는 세 개의 점을 마우스 오른쪽 버튼으로 클릭)를 가져올 수 있으며 bucket_filepath는 동일하게 보입니다 'gs://MY_BUCKET/MY_CSV.csv'.

대신에 더미 팬더 데이터 프레임을 저장하는 것을 확인했고 pd.to_csv동일한 bucket_filepath(!)로 작동했습니다.

with statement따라서 작성자가 승인되지 않거나 파일을 여는 등 의 다른 이유가 있어야 합니다 .

오류를 발생시키는 코드는 다음과 같습니다. 로컬 서버의 일반적인 크론 작업에서 Google Cloud 함수 외부에서 작동하는 것과 동일한 코드를 사용합니다. 오류가 발생하는 줄 주위에 두 개의 디버그 인쇄를 추가했는데 print("Right after opening the file ...")더 이상 표시되지 않습니다. 각 배치에 대해 호출 query_execute_batch()하는 하위 기능 write_to_csv_file()도 표시되지만 csv 파일을 쓰기-열 때 시작 부분에 이미 오류가 발생하므로 여기서는 문제가 아닐 가능성이 높습니다.

requirements.txt(그런 다음 모듈로 가져옵니다):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

그리고 다음에서 main.py:

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

위 스크립트가 작동하려면 gcsfs버킷을 읽기-쓰기 가능하게 만드는 가져오기를 수행해야 합니다. 그렇지 않으면 다음과 같은 Google 클라우드 저장소 객체가 필요할 것입니다.

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

그런 다음 추가 기능을 사용하여 해당 버킷에 파일을 만듭니다. 그러나 여기서는 이것이 목표가 아닙니다.

다음에서 pd.to_csv작동하는 코드는 더미 SQL 쿼리의 출력을 SELECT 1데이터 프레임의 입력으로 사용합니다. 이것~할 수 있다동일한 bucket_filepath에 저장되어야 합니다. 물론 이유는 단지 pd.to_csv()그런 것이 아니라 데이터 세트가 거대한 SELECT query. 아니면 다른 이유가 있는 것 같아요.

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

unicodecsv 모듈을 사용하여 유니코드로 작성할 수 있는 기회와 배치를 사용할 수 있는 기회를 얻기 위해 csv 기록기를 사용하고 싶습니다.

다음과 같이 팬더에서 배치( loop + append모드 또는 ) 로 변경할 의향이 있을 수 있습니다.chunksize대규모 Pandas Dataframe을 CSV 파일에 청크로 작성이 버킷 파일 경로 문제를 해결하려면 준비된 코드를 사용하고 싶습니다(실행 중인 시스템을 건드리지 마세요).

write모드 = 에서 버킷에 새 파일을 열 수 있도록 csv 기록기로 해당 csv를 저장하려면 어떻게 해야 합니까 with open(filepath, "w") as outcsv:?

주어진 함수는 write_to_csv_file()광범위한 함수와 계단식 함수를 사용하는 Cloud 함수의 아주 작은 부분일 뿐입니다. 여기서 재현 가능한 사례 전체를 보여줄 수는 없으며 경험이나 더 쉬운 예를 통해 답을 얻을 수 있기를 바랍니다.

답변1

해결책은 놀랍습니다. 너~ 해야 하다.gcsfsopen()

을 사용하는 경우에는 pd.to_csv()필요 import gcsfs하지 않지만gcsfs작업을 requirements.txt수행 하려면 여전히 필요합니다.pd.to_csv(), 따라서 팬더는 to_csv()자동으로 사용하는 것 같습니다.

놀랍게도 pd.to_csv()다음은 질문에 답하는 코드입니다(테스트됨).

def write_to_csv_file(connection, filepath):
    """Write the QUERY result in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    return: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")
   

    # A gcsfs object is needed to open a file.
    # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
    # https://gcsfs.readthedocs.io/en/latest/index.html#examples
    # Side-note (Exception):
    # pd.to_csv() needs neither the gcsfs object, nor its import.
    # It is not used here, but it has been tested with examples.
    fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
    fs.ls(BUCKET_NAME)


    # wb needed, else "builtins.TypeError: must be str, not bytes"
    # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
    with fs.open(filepath, 'wb') as outcsv:
        print("Right after opening the file ...")

        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        print("before writer.writeheader()")
        writer.writeheader()
        print("after writer.writeheader()")

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

사이드 노트

이와 같이 CSV 작성기를 사용하지 마십시오.

pd.to_csv()700,000개 행을 로드하고 버킷에 CSV로 저장하는 데 62초만 필요한 5000 매개변수를 사용하는 대신 chunksize배치 작성자가 있는 CF는 9분 이상 소요됩니다. 시간 초과 제한. 따라서 대신 사용 pd.to_csv()하고 데이터를 데이터 프레임으로 변환해야 합니다.

관련 정보