
Para meu fluxo de trabalho, preciso executar um trabalho com Spark. Tentei executar meu trabalho de faísca com fluxo de ar. No entanto, houve um problema de tempo limite da rede. Então, adicionei a opção 'spark.network.timeout' ao sparkSubmitOperator conf conforme abaixo.
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
SLACK_CONN_ID = 'slack-search'
#---- Spark resource size
NUM_EXECUTOR = 56
EXECUTOR_CORES = 5
EXECUTOR_MEMORY = '20g'
JAR_FILE = 'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar'
EXECUTE_CLASS = 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay'
AGG_PERIOD = 1
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = "iac_keyword_stat_day failed"
failed_alert = SlackWebhookOperator(
task_id='slack_notify',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
channel='#search-alert',
username='airflow',
dag=dag)
return failed_alert.execute(context=context)
def get_from_date():
return (datetime.now() - timedelta(days=AGG_PERIOD)).strftime('%Y%m%d')
def get_to_date():
return datetime.now().strftime('%Y%m%d')
default_args = {
'owner': 'search',
'depends_on_past': False,
'start_date': datetime(2019, 6, 21),
'retries': 1,
'retry_delay': timedelta(hours=1),
'on_fail_callback': task_fail_slack_alert,
}
default_conf = {
'spark.network.timeout': '800s',
'spark.executor.heartbeatInterval': '60s'
}
dag = DAG('iac_keyword_stat_day', catchup=False, default_args=default_args, schedule_interval="0 6 * * *")
t1 = SparkSubmitOperator(
task_id='spark_submit',
application=JAR_FILE,
conf=default_conf,
java_class=EXECUTE_CLASS,
executor_cores=EXECUTOR_CORES,
executor_memory=EXECUTOR_MEMORY,
num_executors=NUM_EXECUTOR,
application_args=["--from", get_from_date(), "--to", get_to_date()],
dag=dag)
No entanto, causou o seguinte erro de fluxo de ar.
{__init__.py:1580} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 176, in execute
self._hook.submit(self._application)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 352, in submit
spark_submit_cmd, returncode
AirflowException: Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Como faço para corrigir isso????