Apache 공기 흐름으로 스파크 제출

Apache 공기 흐름으로 스파크 제출

내 워크플로에서는 Spark를 사용하여 작업을 실행해야 합니다. 공기 흐름으로 스파크 작업을 실행하려고 했습니다. 그러나 네트워크 시간 초과 문제가 발생했습니다. 그래서 아래와 같이 SparkSubmitOperator conf에 'spark.network.timeout' 옵션을 추가했습니다.

from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

SLACK_CONN_ID = 'slack-search'

#---- Spark resource size
NUM_EXECUTOR = 56
EXECUTOR_CORES = 5
EXECUTOR_MEMORY = '20g'

JAR_FILE = 'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar'
EXECUTE_CLASS = 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay'
AGG_PERIOD = 1

def task_fail_slack_alert(context):
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = "iac_keyword_stat_day failed"
    failed_alert = SlackWebhookOperator(
        task_id='slack_notify',
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        message=slack_msg,
        channel='#search-alert',
        username='airflow',
        dag=dag)
    return failed_alert.execute(context=context)

def get_from_date():
    return (datetime.now() - timedelta(days=AGG_PERIOD)).strftime('%Y%m%d')

def get_to_date():
    return datetime.now().strftime('%Y%m%d')

default_args = {
    'owner': 'search',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 21),
    'retries': 1,
    'retry_delay': timedelta(hours=1),
    'on_fail_callback': task_fail_slack_alert,
}

default_conf = {
    'spark.network.timeout': '800s',
    'spark.executor.heartbeatInterval': '60s'
}

dag = DAG('iac_keyword_stat_day', catchup=False, default_args=default_args, schedule_interval="0 6 * * *")

t1 = SparkSubmitOperator(
    task_id='spark_submit',
    application=JAR_FILE,
    conf=default_conf,
    java_class=EXECUTE_CLASS,
    executor_cores=EXECUTOR_CORES,
    executor_memory=EXECUTOR_MEMORY,
    num_executors=NUM_EXECUTOR,
    application_args=["--from", get_from_date(), "--to", get_to_date()],
    dag=dag)

그러나 다음과 같은 공기 흐름 오류가 발생했습니다.

{__init__.py:1580} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 176, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 352, in submit
    spark_submit_cmd, returncode
AirflowException: Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.

어떻게 고치나요????

관련 정보