Beim Schreiben von CSV von CF in Bucket: „with open(filepath, „w“) as MY_CSV:“ führt zu „FileNotFoundError: [Errno 2] Keine solche Datei oder kein solches Verzeichnis:“

Beim Schreiben von CSV von CF in Bucket: „with open(filepath, „w“) as MY_CSV:“ führt zu „FileNotFoundError: [Errno 2] Keine solche Datei oder kein solches Verzeichnis:“

Ich erhalte diesen Fehler, FileNotFoundError: [Errno 2] No such file or directorywenn ich versuche, eine CSV-Datei in den Bucket zu schreiben, indem ich einen CSV-Writer verwende, der eine Schleife über Datenstapel durchläuft. Der vollständige Einblick in die Cloud Function-Protokolle zu diesem Fehler:


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

Und dies, obwohl dieser Bucket-Filepath definitiv existiert: Ich kann eine leere Dummy-Datei hochladen und ihre „gsutils-URI“ abrufen (Rechtsklick auf die drei Punkte auf der rechten Seite der Datei) und der Bucket-Filepath wird gleich aussehen: 'gs://MY_BUCKET/MY_CSV.csv'.

Ich habe überprüft, ob stattdessen ein Dummy-Pandas-Dataframe gespeichert werden soll, pd.to_csvund es hat mit demselben Bucket-Filepath (!) funktioniert.

Es muss also einen anderen Grund geben, wahrscheinlich wird der Writer oder derjenige, with statementder die Datei öffnet, nicht akzeptiert.

Der Code, der den Fehler auslöst, ist wie folgt. Es handelt sich um denselben Code, der außerhalb der Google Cloud Function in einem normalen Cron-Job auf einem lokalen Server ausgeführt wird. Ich habe zwei Debug-Ausdrucke um die Zeile hinzugefügt, die den Fehler auslöst, die print("Right after opening the file ...")nicht mehr angezeigt werden. Die Unterfunktion query_execute_batch(), die write_to_csv_file()für jeden Batch aufgerufen wird, wird ebenfalls angezeigt, ist hier aber wahrscheinlich nicht das Problem, da der Fehler bereits ganz am Anfang beim Schreiben/Öffnen der CSV-Datei auftritt.

requirements.txt(die dann als Module importiert werden):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

Und von main.py:

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Beachten Sie, dass ich, damit das obige Skript funktioniert, importiere, gcsfswodurch der Bucket lese- und schreibgeschützt wird. Andernfalls bräuchte ich wahrscheinlich ein Google Cloud Storage-Objekt wie zum Beispiel:

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

und dann die Datei in diesem Bucket mit weiteren Funktionen erstellen, aber das ist hier nicht das Ziel.

Im Folgenden der pd.to_csvfunktionierende Code: Er verwendet die Ausgabe einer Dummy-SQL-Abfrage SELECT 1als Eingabe eines Datenrahmens. Diesdürfenim selben Bucket-Dateipfad gespeichert werden, der Grund kann natürlich nicht nur pd.to_csv()darin liegen, sondern auch darin, dass der Datensatz ein Dummy ist und keine komplexen Unicode-Zeichenfolgen aus einem riesigen SELECT query. Oder es gibt einen anderen Grund, ich vermute nur.

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

Ich möchte den CSV-Writer verwenden, um mit dem Modul „unicodecsv“ in Unicode schreiben und die Batches verwenden zu können.

Ich könnte bereit sein, zu Batches ( loop + appendModus oder chunksize) in Pandas zu ändern, wie inGroße Pandas-Dataframes stückweise in eine CSV-Datei schreibenum dieses Bucket-Dateipfadproblem loszuwerden, aber ich würde lieber den fertigen Code verwenden (niemals ein laufendes System berühren).

Wie kann ich das Speichern dieser CSV-Datei mit dem CSV-Writer durchführen, sodass er im writeModus = eine neue Datei im Bucket öffnen kann with open(filepath, "w") as outcsv:?

Die angegebene Funktion write_to_csv_file()ist nur ein kleiner Teil der Cloud-Funktion, die eine breite Palette von Funktionen und kaskadierten Funktionen verwendet. Ich kann hier nicht den gesamten reproduzierbaren Fall zeigen und hoffe, dass dies durch Erfahrung oder einfachere Beispiele beantwortet werden kann.

Antwort1

Die Lösung ist überraschend. SiemussImportieren und verwenden Sie das gcsfsModul, wenn Sie mit in eine Datei schreiben möchten open().

Wenn Sie verwenden pd.to_csv(), import gcsfsist dies nicht erforderlich, abergcsfsist noch erforderlich, um Arbeit requirements.txtzu machenpd.to_csv(), daher to_csv()scheint Pandas es automatisch zu verwenden.

pd.to_csv()Abgesehen von der Überraschung ist hier der Code, der die Frage beantwortet (getestet):

def write_to_csv_file(connection, filepath):
    """Write the QUERY result in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    return: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")
   

    # A gcsfs object is needed to open a file.
    # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
    # https://gcsfs.readthedocs.io/en/latest/index.html#examples
    # Side-note (Exception):
    # pd.to_csv() needs neither the gcsfs object, nor its import.
    # It is not used here, but it has been tested with examples.
    fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
    fs.ls(BUCKET_NAME)


    # wb needed, else "builtins.TypeError: must be str, not bytes"
    # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
    with fs.open(filepath, 'wb') as outcsv:
        print("Right after opening the file ...")

        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        print("before writer.writeheader()")
        writer.writeheader()
        print("after writer.writeheader()")

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Randnotiz

Verwenden Sie den CSV-Writer nicht auf diese Weise.

Es dauert zu lange. Statt pd.to_csv()mit einem chunksizeParameter von 5000, der nur 62 Sekunden braucht, um die 700.000 Zeilen zu laden und als CSV im Bucket zu speichern, dauert der CF mit dem Batch-Writer mehr als 9 Minuten, was über dem Timeout-Limit liegt. Ich bin daher gezwungen, pd.to_csv()stattdessen zu verwenden und meine Daten dafür in einen Dataframe zu konvertieren.

verwandte Informationen