
Para mi flujo de trabajo, necesito ejecutar un trabajo con Spark. Intenté ejecutar mi trabajo de chispa con flujo de aire. Sin embargo, hubo un problema de tiempo de espera de la red. Entonces, agregué la opción 'spark.network.timeout' a la configuración de sparkSubmitOperator como se muestra a continuación.
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
SLACK_CONN_ID = 'slack-search'
#---- Spark resource size
NUM_EXECUTOR = 56
EXECUTOR_CORES = 5
EXECUTOR_MEMORY = '20g'
JAR_FILE = 'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar'
EXECUTE_CLASS = 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay'
AGG_PERIOD = 1
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = "iac_keyword_stat_day failed"
failed_alert = SlackWebhookOperator(
task_id='slack_notify',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
channel='#search-alert',
username='airflow',
dag=dag)
return failed_alert.execute(context=context)
def get_from_date():
return (datetime.now() - timedelta(days=AGG_PERIOD)).strftime('%Y%m%d')
def get_to_date():
return datetime.now().strftime('%Y%m%d')
default_args = {
'owner': 'search',
'depends_on_past': False,
'start_date': datetime(2019, 6, 21),
'retries': 1,
'retry_delay': timedelta(hours=1),
'on_fail_callback': task_fail_slack_alert,
}
default_conf = {
'spark.network.timeout': '800s',
'spark.executor.heartbeatInterval': '60s'
}
dag = DAG('iac_keyword_stat_day', catchup=False, default_args=default_args, schedule_interval="0 6 * * *")
t1 = SparkSubmitOperator(
task_id='spark_submit',
application=JAR_FILE,
conf=default_conf,
java_class=EXECUTE_CLASS,
executor_cores=EXECUTOR_CORES,
executor_memory=EXECUTOR_MEMORY,
num_executors=NUM_EXECUTOR,
application_args=["--from", get_from_date(), "--to", get_to_date()],
dag=dag)
Sin embargo, provocó el siguiente error de flujo de aire.
{__init__.py:1580} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 176, in execute
self._hook.submit(self._application)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 352, in submit
spark_submit_cmd, returncode
AirflowException: Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
¿Cómo lo soluciono????