From the Polls to the Trolls

Seeing what the world thinks at YouGov

  • YouGov conducts highly dynamic surveys that rely on MongoDB's rich support for semi-structured data.
  • YouGov uses tag-aware sharding to provide MongoDB-as-a-Service (MongoDBaaS) for a global enterprise.
  • MongoDB is a joy to use and an effective multipurpose persistence store for large-scale applications.

About Us

About YouGov

  • community
  • opinions & habits
  • smart analysis
  • insights and prediction
  • today.yougov.com

About YouGov Dev

  • 38 developers in 9 countries
  • primarily leverage Python and open-source
  • high interactivity and integration
  • streamlined deployment

About Jason

  • <3 Python
  • studied web data (semi-structured)
  • motivated to unfetter data

Two Challenges

Challenge: Survey Collection

  • highly dynamic surveys
  • programmed by users
  • semi-structured data
  • high throughput
  • increasing throughput

Challenge: Global Operation

Flexible Data Model

  • store survey responses as document
  • (automatic) schema migration

Migration Manager

  • between driver and application
  • schema version
  • incrementing integer
  • migrates on load
  • untouched on save

Migration Manager

Implementation (1)

class Manager:
	"""
	A manager for facilitating the registration of migration functions
	and applying those migrations to documents.
	"""

	version_attribute_name = 'version'
	_upgrade_funcs = set()

	def __init__(self, target_version):
		self.target_version = target_version

	@classmethod
	def register(cls, func):
		"""
		Decorate a migration function with this method
		to make it available for migrating cases.
		"""
		cls._add_version_info(func)
		cls._upgrade_funcs.add(func)
		return func
						

Implementation (2)

class Manager:
	def migrate_doc(self, doc):
		"""
		Migrate the doc from its current version to the target version
		and return it.
		"""
		orig_ver = doc.get(self.version_attribute_name, 0)
		funcs = self._get_migrate_funcs(orig_ver, self.target_version)
		for func in funcs:
			func(self, doc)
			doc[self.version_attribute_name] = func.target
		return doc

	@classmethod
	def _get_migrate_funcs(cls, orig_version, target_version):
		direction = 1 if target_version > orig_version else -1
		versions = range(orig_version, target_version + direction, direction)
		transitions = recipes.pairwise(versions)
		return itertools.starmap(cls._get_func, transitions)

Implementation (3)

@Manager.register
def v6_to_7(manager, case):
	"""
	V7 adds the survey_id and user_id fields
	"""
	ident = CaseIdent(case.get('ident', ''))
	case.setdefault('user_id', ident.user_id)
	case.setdefault('survey_id', ident.survey_id)

@Manager.register
def v7_to_6(manager, case):
	ident = CaseIdent.from_parts(
		case.pop('user_id', None),
		case.pop('survey_id', None),
	)
	case['ident'] = str(ident)

Caution: Field Rename

  • queries
  • indexes

Mitigation

  • support both fields
  • maintain dual indexes
  • backfill migration

Other considerations

  • single-document design
  • forward-compatibility
  • reversible operations

Implementation (4)

@Manager.register
def v10_to_11(manager, case):
	"""
	Remove legacy attributes.
	"""
	case.pop('survey_name', None)
	case.pop('survey_version', None)
	case.pop('ident', None)
https://github.com/jaraco/jaraco.mongodb

Global Sharded Cluster

Sharding Strategy

  • 2 datacenters separated by 5,400 miles
  • partitions survey data via tag-aware sharding
  • provides Database-as-a-Service (DBaaS)

Interview Data Model

Interview Life Cycle

Interview Data Schema

{
	"region": "EMEA",
	"survey_id": "rfc1274",
	"user_id": 216074,
	"responses": {
		"favourite_drink": "chocolate milk",
		// ...
	}
}

Shard Key Selection

Oops!

Let's Try That Again...

Much Better!

Caution: Broadcast Operations

Targeted Operations

Two Datacenters

  • London (a.k.a. "EMEA")
  • Palo Alto, CA (a.k.a. "US")

Deploying Shards

Global Sharded Cluster

Tag-Aware Sharding (1)


// connect to mongos

// switch to admin database
use admin

// Step 1: Add shards
db.runCommand({addShard: "ldn-rs1/ldn-host1", name: "LDN1"});
db.runCommand({addShard: "ldn-rs2/ldn-host4", name: "LDN2"});
db.runCommand({addShard: "ldn-rs3/ldn-host7", name: "LDN3"});
db.runCommand({addShard: "pa-rs1/pa-host1", name: "PA1"});
db.runCommand({addShard: "pa-rs2/pa-host4", name: "PA2"});
						

Tag-Aware Sharding (2)


// Step 2: Add shard tags
sh.addShardTag("LDN1", "EMEA");
sh.addShardTag("LDN2", "EMEA");
sh.addShardTag("LDN3", "EMEA");
sh.addShardTag("PA1", "US");
sh.addShardTag("PA2", "US");
						

Tag-Aware Sharding (3)


// Step 3: Enable sharding
sh.enableSharding("yougov");
sh.shardCollection(
	"yougov.interviews",                     // collection namespace
	{"region": 1, "survey_id": 1}            // shard key pattern
);
						

Tag-Aware Sharding (4)


// Step 4: Tag each shard key range
sh.addTagRange(
	"yougov.interviews",                     // collection namespace
	{"region": "EMEA", "survey_id": MinKey}, // min value
	{"region": "EMEA", "survey_id": MaxKey}, // max value
	"EMEA"                                   // tag
);
sh.addTagRange(
	"yougov.interviews",                     // collection namespace
	{"region": "US", "survey_id": MinKey},   // min value
	{"region": "US", "survey_id": MaxKey},   // max value
	"US"                                     // tag
);
						

(Mongo) Database-as-a-Service

  • mongos instance running on each app server
  • applications connect to mongodb://localhost/

Read Preference "nearest"

  • provides low-latency reads when using geographically distributed replica sets
from pymongo import MongoClient, ReadPreference

# set read preference on the client
mongo = MongoClient('localhost',
	read_preference=ReadPreference.NEAREST)

# ...or on an individual database
mongo = MongoClient('localhost')
db = mongo.get_database('dragoman',
	read_preference=ReadPreference.NEAREST)

Database Creation

def create_db_in_shard(db_name, shard, client=None):
	"""
	In a sharded cluster, create a database in a particular shard.
	"""
	client = client or pymongo.MongoClient()
	# flush the router config to ensure it's not stale
	res = client.admin.command('flushRouterConfig')
	if not res.get('ok'):
		raise RuntimeError("unable to flush router config")
	if shard not in get_ids(client.config.shards):
		raise ValueError(nf("Unknown shard {shard}"))
	if db_name in get_ids(client.config.databases):
		raise ValueError("database already exists")
	# MongoDB doesn't have a 'create database' command, so insert an
	#  item into a collection and then drop the collection.
	client[db_name].foo.insert({'foo': 1})
	client[db_name].foo.drop()
	if client[db_name].collection_names():
		raise ValueError("database has collections")
	res = client.admin.command('movePrimary', value=db_name, to=shard)
	if not res.get('ok'):
		raise RuntimeError(str(res))
	return nf("Successfully created {db_name} in {shard} via {hostname}")

Multi-Dimensional Partitioning

  • still partition for archival
  • managed programmatically
  • seek additional layers of partitioning

Partitioning Motives

  • locality
  • distribution (size)
  • distribution (performance)
  • storage

Predictions

  • migrate with whimsy
  • all apps in global cluster

Wishes

  • Supply shard hint, independent from query SERVER-11991
  • Generalized custom shard logic, independent from data in records
  • Oplog-only replica for creating new replicas Server-14539
  • Embedded solution (sqlite)

Conclusions

  • If you have dynamically changing data, choose a schemaless database. Take advantage of the schemaless aspects and migrate with whimsy.
  • When your company is successful and international, use tag-aware sharding to achieve partitioning of your data without losing control, but let MongoDB manage the abstraction.

Questions?

https://bit.ly/polls2trolls