![Ao gravar csv do CF para o bucket: 'with open(filepath, "w") as MY_CSV:' leva a "FileNotFoundError: [Errno 2] Esse arquivo ou diretório não existe:"](https://rvso.com/image/774540/Ao%20gravar%20csv%20do%20CF%20para%20o%20bucket%3A%20'with%20open(filepath%2C%20%22w%22)%20as%20MY_CSV%3A'%20leva%20a%20%22FileNotFoundError%3A%20%5BErrno%202%5D%20Esse%20arquivo%20ou%20diret%C3%B3rio%20n%C3%A3o%20existe%3A%22.png)
Recebo esse erro FileNotFoundError: [Errno 2] No such file or directory
quando tento gravar um arquivo CSV no bucket, usando um gravador CSV que faz loop em lotes de dados. O insight completo sobre os registros do Cloud Function em torno desse erro:
File "/workspace/main.py", line 299, in write_to_csv_file with open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such file or directory: 'gs://MY_BUCKET/MY_CSV.csv' Function execution took 52655 ms, finished with status: 'crash' OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k ```
E isso, embora esse bucket_filepath definitivamente exista: posso fazer upload de um arquivo fictício vazio e obter seu "URI gsutils" (clique com o botão direito nos três pontos no lado direito do arquivo) e o bucket_filepath terá a mesma aparência: 'gs://MY_BUCKET/MY_CSV.csv'
.
Eu verifiquei salvar um dataframe fictício do pandas usando pd.to_csv
e funcionou com o mesmo bucket_filepath (!).
Portanto, deve haver outro motivo, provavelmente o escritor não foi aceito ou o with statement
que abre o arquivo.
O código que gera o erro é o seguinte. É com o mesmo código funcionando fora do Google Cloud Function em um cron job normal em um servidor local. Adicionei duas impressões de depuração ao redor da linha que gera o erro, mas print("Right after opening the file ...")
não aparece mais. A subfunção query_execute_batch()
que write_to_csv_file()
está chamando cada lote também é mostrada, mas provavelmente não é o problema aqui, pois o erro ocorre logo no início, ao abrir o arquivo csv por gravação.
requirements.txt
(que são então importados como módulos):
SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1
E a partir de main.py
:
def query_execute_batch(connection):
"""Function for reading data from the query result into batches
:yield: each result in a loop is a batch of the query result
"""
results = execute_select_batch(connection, SQL_QUERY)
print(f"len(results): {len(results)}")
for result in results:
yield result
def write_to_csv_file(connection, filepath):
"""Write the data in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
returns: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
with open(filepath, "w") as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
writer.writeheader()
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
Lembre-se de que para que o script acima funcione, eu importo, gcsfs
o que torna o bucket disponível para leitura e gravação. Caso contrário, provavelmente precisaria de um objeto de armazenamento em nuvem do Google, como por exemplo:
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
e, em seguida, crie o arquivo nesse intervalo com outras funções, mas esse não é o objetivo aqui.
A seguir, o pd.to_csv
código que funciona, ele usa a saída de uma consulta SQL fictícia SELECT 1
como entrada de um dataframe. Essepodeser salvo no mesmo bucket_filepath, é claro que o motivo pode não ser apenas pd.to_csv()
esse, mas também que o conjunto de dados é um manequim em vez de strings unicode complexas de um enorme arquivo SELECT query
. Ou há outro motivo, só estou supondo.
if records is not None:
df = pd.DataFrame(records.fetchall())
df.columns = records.keys()
df.to_csv(filepath,
index=False,
)
datetime_now_save = datetime.now()
countrows = df.shape[0]
Eu gostaria de usar o gravador csv para ter a chance de escrever em unicode com o módulo unicodecsv e de usar os lotes.
Talvez eu esteja disposto a mudar para lotes ( loop + append
modo ou chunksize
) em pandas como emGravando grandes dataframes do Pandas em arquivos CSV em pedaçospara me livrar desse problema de caminho de arquivo do bucket, mas eu gostaria de usar o código pronto (nunca toque em um sistema em execução).
Como posso salvar esse csv com o gravador csv para que ele possa abrir um novo arquivo no bucket em write
mode = with open(filepath, "w") as outcsv:
?
A função fornecida write_to_csv_file()
é apenas uma pequena parte da Cloud Function que usa uma ampla gama de funções e funções em cascata. Não posso mostrar aqui todo o caso reproduzível e espero que possa ser respondido pela experiência ou por exemplos mais fáceis.
Responder1
A solução é surpreendente. Vocêdeveimporte e use o gcsfs
módulo se quiser gravar em um arquivo com extensão open()
.
Se você usar pd.to_csv()
, import gcsfs
não é necessário, masgcsfs
ainda é necessário para requirements.txt
fazer pd.to_csv()
o trabalho, portanto, os pandas to_csv()
parecem usá-lo automaticamente.
Deixada a pd.to_csv()
surpresa de lado, aqui está o código que responde à pergunta (testado):
def write_to_csv_file(connection, filepath):
"""Write the QUERY result in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
return: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
# A gcsfs object is needed to open a file.
# https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
# https://gcsfs.readthedocs.io/en/latest/index.html#examples
# Side-note (Exception):
# pd.to_csv() needs neither the gcsfs object, nor its import.
# It is not used here, but it has been tested with examples.
fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
fs.ls(BUCKET_NAME)
# wb needed, else "builtins.TypeError: must be str, not bytes"
# https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
with fs.open(filepath, 'wb') as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
print("before writer.writeheader()")
writer.writeheader()
print("after writer.writeheader()")
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
Nota
Não use o gravador csv assim.
Demora muito, em vez do pd.to_csv()
parâmetro chunksize
5000 que precisa de apenas 62s para que as 700 mil linhas sejam carregadas e armazenadas como um csv no bucket, o CF com o gravador de lotes leva mais do que os 9 minutos que ultrapassam o limite de tempo limite. Sou, portanto, forçado a usar pd.to_csv()
e converter meus dados em um dataframe para isso.