Automatically Load Data From MySQL(Cloud SQL) To BigQuery Using Embulk[ETL] (+ AirFlow Scheduling)

JooYoung Lim
5 min readMar 17, 2020

Welcome! 😄

“Embulk” has a lot of beneficial points.

usage of embulk (source : https://www.embulk.org/docs/)

Embulk supports parallel execution!

Also, it provides multiple plug-ins. So we can move data from (MySQL, Amazon S3, HDFS …) to (Hive,Cassandra,Redis,BigQuery…).

In this post, we’re going to move data from MySQL(Cloud SQL) to BigQuery.

Data Pipeline and Reporting Services (source : https://whitechoi.tistory.com/50)

Now, we are using Google Cloud Platform. We need to install Embulk in Google Compute Engine. And scheduling with Apache Airflow.

Saved data are able to use various tools like Data Studio, Datalab, Tableau, redash…

Our goal is load MySQL(Cloud SQL) data to BigQuery, So we can visualize and analyze data easily.

1. Load Data From MySQL(Cloud SQL) To BigQuery Using Embulk

First of all, we need to know information about MySQL(Cloud SQL).

Embulk needs MySQL(Cloud SQL)‘s IP address, account name, password to load data.

Second, create VM instance for install Embulk.

create VM instance

When VM settings are done, we need to install Embulk in VM instance.

Embulk is work on JVM, we should install JVM in VM.

$ sudo apt-get update
$ sudo apt-get install default-jre

This commands is for installing Embulk.

$ curl --create-dirs -o ~/.embulk/bin/embulk -L
"https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

After installation, check Embulk is successfully installed.

$ embulk -version
embulk 0.9.22

After, we need to install plug-ins.

input : MySQL

output : BigQuery

$ embulk gem install embulk-input-mysql
$ embulk gem install embulk-output-bigquery

Also, we can check installed plug-ins by this command.

$ embulk gem list*** LOCAL GEMS ***addressable (2.7.0)
bundler (1.16.0)
concurrent-ruby (1.1.5)
declarative (0.0.10)
declarative-option (0.1.0)
did_you_mean (default: 1.0.1)
embulk (0.9.22 java)
embulk-input-mysql (0.10.1)
embulk-output-bigquery (0.6.4)
embulk-output-command (0.1.4)
faraday (0.17.3)
google-api-client (0.32.1)
googleauth (0.9.0)
httpclient (2.8.3)
jar-dependencies (default: 0.3.10)
jruby-openssl (0.9.21 java)
jruby-readline (1.2.0 java)
json (1.8.3 java)
jwt (2.2.1)
liquid (4.0.0)
memoist (0.16.2)
mini_mime (1.0.2)
minitest (default: 5.4.1)
msgpack (1.1.0 java)
multi_json (1.14.1)
multipart-post (2.1.1)
net-telnet (default: 0.1.1)
os (1.0.1)
power_assert (default: 0.2.3)
psych (2.2.4 java)
public_suffix (4.0.3)
rake (default: 10.4.2)
rdoc (default: 4.2.0)
representable (3.0.4)
retriable (3.1.2)
signet (0.11.0)
test-unit (default: 3.1.1)
time_with_zone (0.3.1)
tzinfo (2.0.1)
uber (0.1.0)

When installation is done, we need to configure .yml file.

Additionally, Embulk loads data by this steps.

Google Cloud SQL -> GCS -> Google BigQuery (source : https://www.kangtaeho.com/61)
  1. Export Google Cloud SQL data to Google Cloud Storage Bucket.
  2. Load data from Google Cloud Storage and import to BigQuery.

Therefore, we need Google Cloud Storage Bucket to save .csv files.

Let’s configure .yml file! (Other plug-in references : https://github.com/embulk)

(mysql-bigquery.yml)in:
type: mysql
host: (MySQL host name)
port: (port number)
user: (user name)
password: (password)
database: (database name)
table: (table name)
select: "*" (select all tables)
out:
type: bigquery
mode: replace (select mode, currently replace)
auth_method: compute_engine (authentication method)
service_accoumt_email: (GCP account email)
project: (project name)
dataset: (dataset name)
table: (table name)
auto_create_table: true (automatically create table)
gcs_bucket: (GCS bucket name)
auto_create_gcs_bucket: true (automatically create bucket)
ignore_unknown_values: true
allow_quoted_newlines: true
auto_create_dataset: true (automatically create dataset)

Also, when we want to export specific data, like this.

in:
type: mysql
host: (MySQL host name)
port: (port number)
user: (user name)
password: (password)
database: (database name)
table: (table name)
select: "*" (select all tables)
where: (condition)
order by: (condition)

You can run .yml file using this command.

embulk run youryml.yml

2. Scheduling ith Apache Airflow

I installed Airflow with Google Cloud Composer. ( https://cloud.google.com/composer?hl=ko)

When Airflow is successfully installed, we need to configure Airflow.

Go to Airflow web server, click Admin - Connection,

Airflow connection settings 1

Click create button to create new connection.

Airflow connection settings 2

Conn Id : connection ID

Conn Type : SSH

Host : embulk VM‘s IP address

Username : root

Password : root’s password (configure it in embulk VM!)

Port : 22

$ sudo passwd root (to change root password)

Save configuration.

Also, we need to install two packages to use SSH connection.

airflow PYPI packages

Let’s create Airflow DAG file!

embulk_airflow_DAG.pyfrom airflow import DAG
from airflow import models
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime,timedelta
default_args = {
'owner' : 'JooYoung' // (argument settings)
}
dag = DAG('mysql-embulk-BigQuery',
description='MySQL -> Embulk -> BigQuery Dataflow',
schedule_interval = '0 */3 * * *', // (interval)
default_args=default_args, // (argument)
start_date=datetime(2020, 2, 4),catchup=False)
embulk_run_account_user = SSHOperator(
ssh_conn_id = 'YOUR CONNECTION ID',
task_id='task ID',command='yml file command ',dag=dag)// ... //embulk_run_account_user // RUN DAG!

There are some important things when we create DAG files.

First, we are using SSHOperator to connect Embulk VM. ssh_conn_id and Conn id (in Airflow) should be same.

Second, we should write command to run .yml file in VM instance.

'/home/user/.embulk/bin/embulk run /home/user/yml/youryml.yml',

like this!

Save DAG file, and upload to Airflow DAG folder.

After, when we connect to Airflow webserver…

Airflow DAG is successfully added!

Click Trigger DAG button to manually run DAG.

Data are saved to BigQuery successfully!

Data are saved to BigQuery successfully!

This is the end of “Load Data From MySQL(Cloud SQL) To BigQuery Using Embulk (+ AirFlow Scheduling)” post.

Thank you for reading the long and deficient article.

--

--