Google Data Fusion with Cloud Composer as a datalake solution

Data Fusion is a google cloud solution for building data pipelines without any code, although the solution has some limitations (so far) when used together with Cloud Composer becomes a really powerfull tool to build data lakes.

So for me the biggest limitation that I found with Data Fusion is that you cannot pass dinamic parameters, the only dinamic parameter that it accepts are with dates that works with the formula bellow:

${logicalStartTime(yyyy-MM-dd)}

This will return the current date, if you want something like yesterday you can obtain like this:

${logicalStartTime(dd/MM/yyyy,1d)}

But except from that, you cannot pass dinamic parameters, so if you want the data fusion to get the last item in a GCS bucket you cannot do that.

The second problem that i had with Data Fusion is that you must always say the output for the nodle, and when you start to make really big pipelines this gets reeeeeeally annoying.

And the last biggest problem is that is you have a lot of pipelines, each pipeline will create a dataproc cluster, run and kill the dataproc cluster, this can add quite some time for the operation because as for today each cluster takes around 5 min to be created, so let’s say you have 30 pipelines you will lose almost 3 hours just repeating this step plus the time it’ll take to actually run the pipelines.

Two of the problems above can be easylly fixed by using Cloud Composer together with Cloud data Fusion, for me the final solution was making the Cloud Composer pass the parameters that I wanted for the Data Fusion pipeline dinamiclly,

Cloud Composer can create, delete, run, stop, update and list all the Data Fusion pipelines automatic with the operators bellow:

CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
CloudDataFusionDeleteInstanceOperator,
CloudDataFusionDeletePipelineOperator,
CloudDataFusionGetInstanceOperator,
CloudDataFusionListPipelinesOperator,
CloudDataFusionRestartInstanceOperator,
CloudDataFusionStartPipelineOperator,
CloudDataFusionStopPipelineOperator,
CloudDataFusionUpdateInstanceOperator,

How to do that, first you need when creating a Data Fusion pipeline to create “open” variables with this:

${NAME OF THE VARIABLE}

When you deploy all the “open” varibles will be listed as runtime argments, that you will need to fullfil before each run:

Now with this created, we can use Google Composer to pass dinamic argments to your pipeline, so for this case for exemple you would create a dag like this:

task1 = CloudDataFusionStartPipelineOperator(
task_id="task1",
location='southamerica-east1',
pipeline_name="test",
namespace='laketest',
runtime_args={'folder':'my_data.csv'},
dag=dag)

for the creating a dataproc and deleting for each pipeline, Google Composer can also help you, you can configure the Data Fusion pipelines to use a existing dataproc instead of creating one:

On data Fusion you will need to go to System admin button on the top left of your screem:

Then you will go to configuration >> system compute profile >> create new profile >> Existing Dataproc:

And fullfil the required fields with the dataproc info that you have:

With this you go back to your own pipeline and change the name of the system profile to the one that you created:

But this solution may be to expensive, since maintain a dataproc cluster created all the time is costly, that when cloud composer comes in handy again, after you do the steps above, you can tell the composer to create and delete the cluster, so it’s up only when the pipeline is running, you can do that adding this to the dag above:

create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id=’create_dataproc_cluster’,
project_id=’teste’,
cluster_name=’airflow-cluster’,
num_workers=4,
subnetwork_uri=’teste-nonprod-southamerica-east1-subnet-private-1',
region=’southamerica-east1',
master_machine_type=’n1-standard-4',
worker_machine_type=’n1-standard-4',
dag=dag)

task1 = CloudDataFusionStartPipelineOperator(
task_id=”task1",
location=’southamerica-east1',
pipeline_name=”test”,
namespace=’teste’,
runtime_args={‘folder’:’my_data_01082020.csv’},
dag=dag)

delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id=’delete_dataproc_cluster’,
project_id=’teste’,
cluster_name=’airflow-cluster’,
region=’southamerica-east1',
dag=dag)

create_dataproc_cluster >> Task1 >> delete_dataproc_cluster

And for the output schema I don’t have a clever fix, but what I lerned that everytime that appears a Java null pointer, try to export the file, then check the schema, and so on for every new nodle you make, it might make devepment a little bit slower but it was how I got trough.

With this hacks you should have more than enough tools to build your own datalake on GCP, so good luck :)

Data scientist certified by google as a Tensorflow developer and also trying to be a intergalactic hitchhiker.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store