Ao gravar csv do CF para o bucket: 'with open(filepath, "w") as MY_CSV:' leva a "FileNotFoundError: [Errno 2] Esse arquivo ou diretório não existe:"

Ao gravar csv do CF para o bucket: 'with open(filepath, "w") as MY_CSV:' leva a "FileNotFoundError: [Errno 2] Esse arquivo ou diretório não existe:"

Recebo esse erro FileNotFoundError: [Errno 2] No such file or directoryquando tento gravar um arquivo CSV no bucket, usando um gravador CSV que faz loop em lotes de dados. O insight completo sobre os registros do Cloud Function em torno desse erro:


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

E isso, embora esse bucket_filepath definitivamente exista: posso fazer upload de um arquivo fictício vazio e obter seu "URI gsutils" (clique com o botão direito nos três pontos no lado direito do arquivo) e o bucket_filepath terá a mesma aparência: 'gs://MY_BUCKET/MY_CSV.csv'.

Eu verifiquei salvar um dataframe fictício do pandas usando pd.to_csve funcionou com o mesmo bucket_filepath (!).

Portanto, deve haver outro motivo, provavelmente o escritor não foi aceito ou o with statementque abre o arquivo.

O código que gera o erro é o seguinte. É com o mesmo código funcionando fora do Google Cloud Function em um cron job normal em um servidor local. Adicionei duas impressões de depuração ao redor da linha que gera o erro, mas print("Right after opening the file ...")não aparece mais. A subfunção query_execute_batch()que write_to_csv_file()está chamando cada lote também é mostrada, mas provavelmente não é o problema aqui, pois o erro ocorre logo no início, ao abrir o arquivo csv por gravação.

requirements.txt(que são então importados como módulos):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

E a partir de main.py:

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Lembre-se de que para que o script acima funcione, eu importo, gcsfso que torna o bucket disponível para leitura e gravação. Caso contrário, provavelmente precisaria de um objeto de armazenamento em nuvem do Google, como por exemplo:

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

e, em seguida, crie o arquivo nesse intervalo com outras funções, mas esse não é o objetivo aqui.

A seguir, o pd.to_csvcódigo que funciona, ele usa a saída de uma consulta SQL fictícia SELECT 1como entrada de um dataframe. Essepodeser salvo no mesmo bucket_filepath, é claro que o motivo pode não ser apenas pd.to_csv()esse, mas também que o conjunto de dados é um manequim em vez de strings unicode complexas de um enorme arquivo SELECT query. Ou há outro motivo, só estou supondo.

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

Eu gostaria de usar o gravador csv para ter a chance de escrever em unicode com o módulo unicodecsv e de usar os lotes.

Talvez eu esteja disposto a mudar para lotes ( loop + appendmodo ou chunksize) em pandas como emGravando grandes dataframes do Pandas em arquivos CSV em pedaçospara me livrar desse problema de caminho de arquivo do bucket, mas eu gostaria de usar o código pronto (nunca toque em um sistema em execução).

Como posso salvar esse csv com o gravador csv para que ele possa abrir um novo arquivo no bucket em writemode = with open(filepath, "w") as outcsv:?

A função fornecida write_to_csv_file()é apenas uma pequena parte da Cloud Function que usa uma ampla gama de funções e funções em cascata. Não posso mostrar aqui todo o caso reproduzível e espero que possa ser respondido pela experiência ou por exemplos mais fáceis.

Responder1

A solução é surpreendente. Vocêdeveimporte e use o gcsfsmódulo se quiser gravar em um arquivo com extensão open().

Se você usar pd.to_csv(), import gcsfsnão é necessário, masgcsfsainda é necessário para requirements.txtfazer pd.to_csv()o trabalho, portanto, os pandas to_csv()parecem usá-lo automaticamente.

Deixada a pd.to_csv()surpresa de lado, aqui está o código que responde à pergunta (testado):

def write_to_csv_file(connection, filepath):
    """Write the QUERY result in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    return: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")
   

    # A gcsfs object is needed to open a file.
    # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
    # https://gcsfs.readthedocs.io/en/latest/index.html#examples
    # Side-note (Exception):
    # pd.to_csv() needs neither the gcsfs object, nor its import.
    # It is not used here, but it has been tested with examples.
    fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
    fs.ls(BUCKET_NAME)


    # wb needed, else "builtins.TypeError: must be str, not bytes"
    # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
    with fs.open(filepath, 'wb') as outcsv:
        print("Right after opening the file ...")

        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        print("before writer.writeheader()")
        writer.writeheader()
        print("after writer.writeheader()")

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Nota

Não use o gravador csv assim.

Demora muito, em vez do pd.to_csv()parâmetro chunksize5000 que precisa de apenas 62s para que as 700 mil linhas sejam carregadas e armazenadas como um csv no bucket, o CF com o gravador de lotes leva mais do que os 9 minutos que ultrapassam o limite de tempo limite. Sou, portanto, forçado a usar pd.to_csv()e converter meus dados em um dataframe para isso.

informação relacionada