Al escribir csv desde CF en el depósito: 'con open(filepath, "w") as MY_CSV:' conduce a "FileNotFoundError: [Errno 2] No existe tal archivo o directorio:"

Al escribir csv desde CF en el depósito: 'con open(filepath, "w") as MY_CSV:' conduce a "FileNotFoundError: [Errno 2] No existe tal archivo o directorio:"

Recibo este error FileNotFoundError: [Errno 2] No such file or directorycuando intento escribir un archivo csv en el depósito, utilizando un escritor csv que recorre lotes de datos. La información completa sobre la función de nube registra ese error:


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

Y esto, aunque este bucket_filepath definitivamente existe: puedo cargar un archivo ficticio vacío y obtener su "URI de gsutils" (haga clic con el botón derecho en los tres puntos en el lado derecho del archivo) y el bucket_filepath tendrá el mismo aspecto: 'gs://MY_BUCKET/MY_CSV.csv'.

Verifiqué guardar un marco de datos ficticio de pandas en lugar de usarlo pd.to_csvy funcionó con el mismo bucket_filepath (!).

Por lo tanto, debe haber otra razón, probablemente el escritor no sea aceptado o el with statementque abra el archivo.

El código que arroja el error es el siguiente. Es con el mismo código funcionando fuera de la función Google Cloud en un trabajo cron normal en un servidor local. Agregué dos impresiones de depuración alrededor de la línea que arroja el error, pero print("Right after opening the file ...")ya no aparece. También se muestra la subfunción query_execute_batch()que write_to_csv_file()llama a cada lote, pero probablemente no sea el problema aquí, ya que el error ocurre desde el principio al abrir el archivo csv.

requirements.txt(que luego se importan como módulos):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

Y de main.py:

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Tenga en cuenta que para que el script anterior funcione, importo, gcsfslo que hace que el depósito esté disponible para lectura y escritura. De lo contrario, probablemente necesitaría un objeto de almacenamiento en la nube de Google como, por ejemplo:

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

y luego crear el archivo en ese depósito con más funciones, pero ese no es el objetivo aquí.

A continuación, el pd.to_csvcódigo que funciona utiliza la salida de una consulta SQL ficticia SELECT 1como entrada de un marco de datos. Estepoderguardarse en el mismo bucket_filepath, por supuesto, la razón podría no ser solo pd.to_csv()tal, sino también que el conjunto de datos es ficticio en lugar de cadenas Unicode complejas de un enorme archivo SELECT query. O hay otra razón, sólo estoy suponiendo.

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

Me gustaría utilizar el escritor csv para tener la oportunidad de escribir en Unicode con el módulo unicodecsv y la posibilidad de usar los lotes.

Podría estar dispuesto a cambiar a lotes ( loop + appendmodo o chunksize) en pandas como enEscribir grandes marcos de datos de Pandas en archivos CSV en fragmentospara deshacerme de este problema de ruta de archivo del depósito, pero me gustaría usar el código listo (nunca toque un sistema en ejecución).

¿Cómo puedo guardar ese csv con el escritor csv para que pueda abrir un nuevo archivo en el depósito en writemodo = with open(filepath, "w") as outcsv:?

La función proporcionada write_to_csv_file()es solo una pequeña parte de Cloud Function que utiliza una amplia gama de funciones y funciones en cascada. No puedo mostrar aquí el caso reproducible completo y espero que pueda responderse mediante la experiencia o ejemplos más sencillos.

Respuesta1

La solución es sorprendente. Túdebeimporte y use el gcsfsmódulo si desea escribir en un archivo con open().

Si usas pd.to_csv(), import gcsfsno es necesario, perogcsfstodavía es necesario en el requirements.txtpara hacer pd.to_csv()el trabajoPor tanto, los pandas to_csv()parecen usarlo automáticamente.

Dejando la pd.to_csv()sorpresa a un lado, aquí está el código que responde a la pregunta (probado):

def write_to_csv_file(connection, filepath):
    """Write the QUERY result in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    return: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")
   

    # A gcsfs object is needed to open a file.
    # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
    # https://gcsfs.readthedocs.io/en/latest/index.html#examples
    # Side-note (Exception):
    # pd.to_csv() needs neither the gcsfs object, nor its import.
    # It is not used here, but it has been tested with examples.
    fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
    fs.ls(BUCKET_NAME)


    # wb needed, else "builtins.TypeError: must be str, not bytes"
    # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
    with fs.open(filepath, 'wb') as outcsv:
        print("Right after opening the file ...")

        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        print("before writer.writeheader()")
        writer.writeheader()
        print("after writer.writeheader()")

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Nota al margen

No utilice el escritor csv de esta manera.

Lleva demasiado tiempo, en lugar de pd.to_csv()con un chunksizeparámetro de 5000 que necesita solo 62 segundos para que las 700k filas se carguen y almacenen como un csv en el depósito, el CF con el escritor de lotes demora más de los 9 minutos que dura el límite de tiempo de espera. Por lo tanto, me veo obligado a usar pd.to_csv()y convertir mis datos en un marco de datos para eso.

información relacionada