Unsupported Features (O2A)
The following features are not supported.
Oozie Workflow EL Functions
Workflow functions are not currently supported. Instead of using EL functions, users should utilize Airflow's XCom feature to pass variables between tasks within a workflow.
This will be taken up as a future enhancement and more workflow functions will be added to O2A.
For example, consider the following workflow:
<workflow-app name="WorkFlowForShellActionWithCaptureOutput" xmlns="uri:oozie:workflow:1.0">
<start to="shellAction"/>
<action name="shellAction">
<shell xmlns="uri:oozie:shell-action:1.0">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${lineCountShellScript}</exec>
<argument>${inputDir}</argument>
<file>${lineCountShScriptPath}#${lineCountShellScript}</file>
<capture-output/>
</shell>
<ok to="sendEmail"/>
<error to="killAction"/>
</action>
<action name="sendEmail">
<email xmlns="uri:oozie:email-action:0.2">
<to>${emailToAddress}</to>
<subject>Output of workflow ${wf:id()}</subject>
<body>Results from line count: ${wf:actionData('shellAction')['NumberOfLines']}</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<kill name="killAction">
<message>"Killed job due to error"</message>
</kill>
<end name="end"/>
</workflow-app>
The output DAG appears as shown below:
x
# Define a function to log the shell action data
def log_shell_action_data(**context):
ti = context['ti']
shell_output = ti.xcom_pull(task_ids='shellAction')
logger.info("Output of shellAction: %s", shell_output)
# Process the shell_output as needed
return "Logging completed"
with models.DAG(
"sample_shell_oozie_2",
schedule_interval=None,
start_date=dates.days_ago(0),
user_defined_macros=TEMPLATE_ENV,
) as dag:
shellAction = bash.BashOperator(
task_id="shellAction",
trigger_rule="one_success",
bash_command=("hdfs dfs -cat hdfs://testcluster.acceldata.ce/user/oozie/workflowShellAction/lineCount.sh | bash -s hdfs://testcluster.acceldata.ce/user/oozie/text.txt"),
do_xcom_push=True,
params=PropertySet(
config=CONFIG,
job_properties=JOB_PROPS,
action_node_properties={"mapred.job.queue.name": "default"},
).merged,
)
# Log the action data
log_action_data = PythonOperator(
task_id="log_action_data",
python_callable=log_shell_action_data,
provide_context=True,
dag=dag,
)
sendEmail = email.EmailOperator(
task_id="sendEmail",
trigger_rule="one_success",
to="{{emailToAddress}}",
cc=None,
bcc=None,
subject="Output of workflow {{run_id}}",
html_content="Results from line count: ti.xcom_pull(task_ids='shellAction')",
params=PropertySet(config=CONFIG, job_properties=JOB_PROPS).merged,
)
# Set the task dependencies
shellAction >> log_action_data >> sendEmail
Coordinator Job Conversion
The current scope of the Oozie to Airflow migration tool focuses primarily on workflow conversion. Support for Oozie coordinator job parsing and conversion is not included at this stage. However, this functionality is being considered for a future enhancement.
Was this page helpful?