Monday, February 20, 2012

scaling using mongodb : Map-Reduce on sharded collection (part 3)

The idea here is to create a sharded database and then run map-reduce on it to process the data. This is a very basic example that I am trying to emulate. I created a sharded collection "posts" with the following structure. The idea is to find the count of tags throughout the complete sharded collection. I am using two machines named "241" and "243" as shards for mongodb. The mongos service is running on a thrid machine "249".

Input collection structure :

mongos> db.posts.find()
{ "_id" : ObjectId("4f4221149a6777895a000000"), "id" : "0", "content" : "data for content 0", "tags" : [ "tag1", "tag2", "tag3", "tag4" ] }
{ "_id" : ObjectId("4f4221149a6777895a000001"), "id" : "1", "content" : "data for content 1", "tags" : [ "tag2", "tag4", "tag5", "tag7", "tag9" ] }

Output collection structure :

mongos> db.tags.find()
{ "_id" : "tag1", "value" : 14705 }
{ "_id" : "tag3", "value" : 14418 }

Lets see the step by step process for creation, population of test data and running of map-reduce.

Lets create the posts collection by putting in a few records. If you print the collection stats, you will see that it is not sharded.

mongos> db.printCollectionStats()
---
posts
{
        "sharded" : false,
        "primary" : "shard241",
        "ns" : "test.posts",
        "count" : 2,
        "size" : 256,
        "avgObjSize" : 128,
        "storageSize" : 8192,
        "numExtents" : 1,
        "nindexes" : 1,
        "lastExtentSize" : 8192,
        "paddingFactor" : 1,
        "flags" : 1,
        "totalIndexSize" : 8176,
        "indexSizes" : {
                "_id_" : 8176
        },
        "ok" : 1
}
---

To shard the collection, you will need to first create index on "id". And then shard the collection using "id" as the key.

mongos> db.posts.ensureIndex({id:1})
mongos> db.posts.getIndexes()
[
        {
                "v" : 1,
                "key" : {
                        "_id" : 1
                },
                "ns" : "test.posts",
                "name" : "_id_"
        },
        {
                "v" : 1,
                "key" : {
                        "id" : 1
                },
                "ns" : "test.posts",
                "name" : "id_1"
        }
]

mongos> use admin
switched to db admin
mongos> db.runCommand( { shardcollection : "test.posts" , key : { id : 1 } } )
{ "collectionsharded" : "test.posts", "ok" : 1 }





The collection "posts" is now sharded. Lets populate some test data into the collection. Here is the php script that I used to populate data into the collection.

$m = new Mongo( "mongodb://192.168.1.249:10003", array("persist" => "x") );
$db = $m->test;
$table = $db->posts;
$start = 0;
$end = 200000;
for($i=$start; $i<$end; $i++)
{
        $tags = getTag();
        $obj = array("id"=>"$i", "content"=>"data for content $i", "tags"=>$tags);
        $table->insert($obj);
        echo "$i:".implode(',',$tags);
}
$found = $table->count();
echo "Found : $found\n";

function getTag()
{
        $tagArray = array('tag1','tag2','tag3','tag4','tag5','tag6','tag7','tag8','tag9','tag10','tag11','tag12','tag13','tag14','tag15','tag16','tag17','tag18','tag19','tag20','tag21','tag22','tag23','tag24','tag25','tag26','tag27','tag28','tag29','tag30','tag31','tag32','tag33','tag34','tag35','tag36','tag37','tag38','tag39','tag40','tag41','tag43');

        $tags = array();
        $tagcount = rand(2,5);

        $count = sizeof($tagArray);
        for($x=0; $x<$tagcount; $x++)
        {
                $tid = rand(0,$count);

                $tags[] = $tagArray[$tid];
        }
        return $tags;
}
?>



I pushed in 200,000 records into the collection. Here is how the data was sharded between "241" and "243";

mongos> db.printCollectionStats()
---
posts
{
        "sharded" : true,
        "flags" : 1,
        "ns" : "test.posts",
        "count" : 200000,
        "numExtents" : 10,
        "size" : 24430872,
        "storageSize" : 32743424,
        "totalIndexSize" : 15534400,
        "indexSizes" : {
                "_id_" : 6508096,
                "id_1" : 9026304
        },
        "avgObjSize" : 122.15436,
        "nindexes" : 2,
        "nchunks" : 4,
        "shards" : {
                "shard241" : {
                        "ns" : "test.posts",
                        "count" : 109889,
                        "size" : 13423484,
                        "avgObjSize" : 122.15493598947415,
                        "storageSize" : 17978183,
                        "numExtents" : 8,
                        "nindexes" : 2,
                        "lastExtentSize" : 12083200,
                        "paddingFactor" : 1,
                        "flags" : 1,
                        "totalIndexSize" : 8531049,
                        "indexSizes" : {
                                "_id_" : 3573332,
                                "id_1" : 4957718
                        },
                        "ok" : 1
                },
                "shard243" : {
                        "ns" : "test.posts",
                        "count" : 90111,
                        "size" : 10913985,
                        "avgObjSize" : 121.11711711711712,
                        "storageSize" : 33251771,
                        "numExtents" : 8,
                        "nindexes" : 2,
                        "lastExtentSize" : 12083200,
                        "paddingFactor" : 1,
                        "flags" : 1,
                        "totalIndexSize" : 13274730,
                        "indexSizes" : {
                                "_id_" : 6617370,
                                "id_1" : 6657360
                        },
                        "ok" : 1
                }
        },
        "ok" : 1
}
---




Now we will create the map and reduce functions. The map function will check for the tags array for each record in the posts collection. For each element of the tag array, it will emit the tag and a count of 1. Next we create a reduce function which counts the occurrances of each tag and returns the final count. The map function calls the emit(key, value) any number of times to feed data to the reducer. The reduce function will receive an array of emitted values from the map function and reduce them to a single value. The structure of the object returned by the reduce function must be identical to the structure of the map function's emitted value.

mongos> map = function() {
... if(!this.tags) {
... return;
... }
... for ( index in this.tags) {
... emit(this.tags[index],1);
... }
... }
function () {
    if (!this.tags) {
        return;
    }
    for (index in this.tags) {
        emit(this.tags[index], 1);
    }
}
mongos> reduce = function(key, values) {
... var count = 0;
... for(index in values) {
... count += values[index];
... }
... return count;
... }
function (key, values) {
    var count = 0;
    for (index in values) {
        count += values[index];
    }
    return count;
}
To understand how it works, lets say that after some iterations, map emitts the following value { "tag1" , 1 }. Suppose at that point "tag1" has a count of 50. That is the document can be represented as:

{ "tag1", 50 }

It map again emits { "tag1", 1 }, reduce will be called as follows :

reduce( "tag1", [50,1] )

The result will be a simple combination of counts for tag1

{ "tag1", 51 }

To invoke map-reduce run the following commands. The command states that mapreduce is run on "posts" collection. Map function is "map" and reduce function is "reduce". Output is redirected to a collection named "tags".

mongos> result =  db.runCommand( {
... "mapreduce" : "posts",
... "map" : map, //name of map function
... "reduce" : reduce,  //name of reduce function
... "out" : "tags" } )
{
        "result" : "tags",
        "shardCounts" : {
                "192.168.1.241:10000" : {
                        "input" : 109889,
                        "emit" : 499098,
                        "reduce" : 6400,
                        "output" : 43
                },
                "192.168.1.243:10000" : {
                        "input" : 90111,
                        "emit" : 200395,
                        "reduce" : 3094,
                        "output" : 43
                }
        },
        "counts" : {
                "emit" : NumberLong(699493),
                "input" : NumberLong(200000),
                "output" : NumberLong(43),
                "reduce" : NumberLong(9494)
        },
        "ok" : 1,
        "timeMillis" : 9199,
        "timing" : {
                "shards" : 9171,
                "final" : 28
        }
}

See how the output documents. The output has only "no of tags" documents - in our case 43.

mongos> db.tags.find()
{ "_id" : "tag1", "value" : 14643 }
{ "_id" : "tag2", "value" : 14705 }
{ "_id" : "tag3", "value" : 14418 }
{ "_id" : "tag4", "value" : 14577 }
{ "_id" : "tag5", "value" : 14642 }
{ "_id" : "tag6", "value" : 14505 }
{ "_id" : "tag7", "value" : 14623 }
{ "_id" : "tag8", "value" : 14529 }
{ "_id" : "tag9", "value" : 14767 }
{ "_id" : "tag10", "value" : 14489 }
has more
 

mongos> db.tags.count()
43


References

http://cookbook.mongodb.org/patterns/count_tags/
http://www.mongodb.org/display/DOCS/MapReduce/

1 comment:

Unknown said...

Good overview of map-reduce using mongodb, Jayant!