Skip to main content Link Search Menu Expand Document (external link)

Sync Data from MySQL to MongoDB

Background

In many cases, we need to transfer MySQL data to MongoDB, such as:

  • 「Database Switching」Transfer a database of MySQL instance to another MongoDB instance.
  • 「Table Aggregation and Data Development」Merge multiple tables into one or split one table to many with UDF

The supported versions of MySQL and MongoDB.

Database Switching

Prerequisites

Source

Filed Value
Source Instance (MySQL) 192.168.1.1
Source Instance Port 3306
Source Database Car_shop (already created)
Source Tables All the table in Car_shop

Target

Filed Value
Target Instance (MongoDB) 192.168.1.2
Target Instance Port 27017
Target Database Car_shop

Action

> # Create Source DataSource
> Source_Mysql = DataSource("mysql","Source_Mysql",'source').host("192.168.1.1").port(3306).username('root').password('password').db('Car_shop')
> Source_Mysql.save()

# Create Target DataSource
> Target_MongoDB = DataSource("mongodb","Target_MongoDB",'target').uri("mongodb://root:password@192.168.1.2:27017/Car_shop?authSource=admin")
> Target_MongoDB.save()

# Create a job that transform all the tables in Source-Mysql to Target-MongoDB.
> replication_job = Pipeline("replication_job").readFrom(Source(Source_Mysql,table_re=".*")).writeTo(Target_MongoDB)

> replication_job.start()

# Check the status of job
> show jobs
> monitor job replication_job

# Check the log of job
> logs job replication_job limit=5 tail=True 

After these steps you can login to the target MongoDB and see the new data.

Table Aggregation and Data Development

Prerequisites

Source

Filed Value
Source Instance (MySQL) 192.168.1.1
Source Instance Port 3306
Source Database Car_shop (already created)
Source Tables Orders
Products

Target

Filed Value
Target Instance (MongoDB) 192.168.1.2
Target Instance Port 27017
Target Database Car_shop
Target Table Orders_and_Products

Requirement

Action

# Write the js script  「find_product_name.js」

	record["updated_at"] = new Date();
	var rs = source.executeQuery({
		sql: "select name from products where  id = " + record.product_id + " limit 1"
	});
	if (rs) {
		record["product_name"] = rs[0].name;
	} else {
		log.warn("Unable to find product_name is products tabel with : " + record.product_id);
		record["product_name"] = "Not Found";
	}
	return record;

> # Create Source DataSource
> Source_Mysql = DataSource("mysql","Source_Mysql",'source').host("192.168.1.1").port(3306).username('root').password('password').db('Car_shop')
> Source_Mysql.save()

# Create Target DataSource
> Target_MongoDB = DataSource("mongodb","Target_MongoDB",'target').uri("mongodb://root:password@192.168.1.2:27017/Car_shop?authSource=admin")
> Target_MongoDB.save()

# Create a job that transform from Source_Mysql to Target_Mysql.

> replication_job = Pipeline("replication_job").readFrom(Source_Mysql.Orders).filterColumn(["id","detail","created_at","product_id"],FilterType.keep).js("/path/find_product_name.js").writeTo("Target_MongoDB.Orders_and_Products",writeMode=WriteMode.upsert, association=[("id", "id")])


> replication_job.start()

# Check the status of job
> show jobs
> monitor job replication_job

# Check the log of job
> logs job replication_job limit=5 tail=True 

After these steps you can login to the target MongoDB and see the new data.