elasticsearch入门|Elasticsearch与MongoDB 数据同步及分布式集群搭建

更新时间:2020-02-27    来源:MongoDB    手机版     字体:

【www.bbyears.com--MongoDB】

过River可以与多种数据源Wikipedia, MongoDB, CouchDB, RabbitMQ, RSS, Sofa, JDBC, FileSystem,Dropbox等同步,公司的业务是用 MongoDB,今天测试环境虚拟机上配置了一下Elasticsearch 与 MongoDB的同步,作个大概的过程记录,主要利用richardwilly98 / elasticsearch-river-mongodb。
River通过读取mongodb的oplog来同步数据,oplog这个表来使集群中的不同机器数据同步的,可以保证es里面的数据和mongodb里面的是一样的,所以Mongdb必须是个集群才能拥有oplog.注意:该插件只支持集群环境下的mongodb,因为集群环境下的mongodb才有oplog这个
Elasticsearch 和 MongoDB需要安装对应的版本才能实现同步,我这里用了最新的Elasticsearch 1.4.2 和 MongoDB 3.0.0,相应的版本要求参考下表

Elasticsearch与MongoDB 数据同步及分布式集群搭建

 
MongDB是一个副本集的集群,具体副本集集群的搭建不详细写了,Elasticsearch的安装配置也省略。
1.安装elasticsearch-river-mongodb
# ./elasticsearch-1.4.4/bin/plugin -install elasticsearch/elasticsearch-mapper-attachments/2.4.1

# ./elasticsearch-1.4.4/bin/plugin -i com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/2.0.5
 
2.建立river
curl -XPUT "http://10.253.1.70:9200/_river/threads_mongo_river/_meta" -d'

{
  "type": "mongodb",
  "mongodb": {
    "servers":
    [
      { "host": "10.253.1.71", "port": 27017}
    ],
    "db": "threads",
    "collection": "threads",
    "gridfs": false
  },
  "index": {
    "name": "test",
    "type": "threads"
  }
}'
这里只是简单的配置了建立连接的MongoDB以及相应的db,collection对于的elasticsearch index和type,还有详细的配置没有使用到,比如options等,具体根据业务可以配置,下面是一份详细的配置样例:
$ curl -XPUT "localhost:9200/_river/${es.river.name}/_meta" -d '
{
  "type": "mongodb",
  "mongodb": {
    "servers":
    [
      { "host": ${mongo.instance1.host}, "port": ${mongo.instance1.port} },
      { "host": ${mongo.instance2.host}, "port": ${mongo.instance2.port} }
    ],
    "options": {
      "secondary_read_preference" : true,
      "drop_collection": ${mongo.drop.collection},
      "exclude_fields": ${mongo.exclude.fields},
      "include_fields": ${mongo.include.fields},
      "include_collection": ${mongo.include.collection},
      "import_all_collections": ${mongo.import.all.collections},
      "initial_timestamp": {
        "script_type": ${mongo.initial.timestamp.script.type},
        "script": ${mongo.initial.timestamp.script}
      },
      "skip_initial_import" : ${mongo.skip.initial.import},
      "store_statistics" : ${mongo.store.statistics},
    },
    "credentials":
    [
      { "db": "local", "user": ${mongo.local.user}, "password": ${mongo.local.password} },
      { "db": "admin", "user": ${mongo.db.user}, "password": ${mongo.db.password} }
    ],
    "db": ${mongo.db.name},
    "collection": ${mongo.collection.name},
    "gridfs": ${mongo.is.gridfs.collection},
    "filter": ${mongo.filter}
  },
  "index": {
    "name": ${es.index.name},
    "throttle_size": ${es.throttle.size},
    "bulk_size": ${es.bulk.size},
    "type": ${es.type.name}
    "bulk": {
      "actions": ${es.bulk.actions},
      "size": ${es.bulk.size},
      "concurrent_requests": ${es.bulk.concurrent.requests},
      "flush_interval": ${es.bulk.flush.interval}
    }
  }
}'
一些配置项的解释如下,具体可以查看github的wiki:
db为同步的数据库名,
host mongodb的ip地址(默认为localhost)
port mongodb的端口
collection 要同步的表名
fields 要同步的字段名(用逗号隔开,默认全部)
gridfs 是否是gridfs文件(如果collection是gridfs的话就设置成true)
local_db_user local数据库的用户名(没有的话不用写)
local_db_password local数据库的密码(没有的话不用写)
db_user 要同步的数据库的密码(没有的话不用写)
db_password 要同步的数据库的密码(没有的话不用写)
name 索引名(不能之前存在)
type 类型
bulk_size 批量添加的最大数
bulk_timeout 批量添加的超时时间
3.测试是否成功
我测试的库中数据较少,所以就直接都查出来看看是否能查出来吧
$ curl -XGET "http://10.253.1.70:9200/test/threads/_search" -d '

{
    "took": 20,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
    },
    "hits": {
        "total": 4,
        "max_score": 1,
        "hits": [
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa32b22c44cf67cb6a9d1b",
                "_score": 1,
                "_source": {
                    "_id": "54fa32b22c44cf67cb6a9d1b",
                    "title": "where is my car",
                    "content": "ask yourself"
                }
            },
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa2f5c2c44cf67cb6a9d19",
                "_score": 1,
                "_source": {
                    "_id": "54fa2f5c2c44cf67cb6a9d19",
                    "title": "this is title",
                    "content": "what is the fuck"
                }
            },
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa2f892c44cf67cb6a9d1a",
                "_score": 1,
                "_source": {
                    "_id": "54fa2f892c44cf67cb6a9d1a",
                    "title": "are you ok",
                    "content": "yes,i am ok"
                }
            },
            {
                "_index": "test",
                "_type": "threads",
                "_id": "54fa49ccc104e2264e02deea",
                "_score": 1,
                "_source": {
                    "_id": "54fa49ccc104e2264e02deea",
                    "title": "hello word",
                    "content": "hello hello haha"
                }
            }
        ]
    }
}

看来数据已经同步过来了,然后在MongoDB 添加一条记录,执行同样的操作查找对于的记录或者 total已经+1那么同步完成了。

我们已经配置了一个Elasticsearch与MongoDB 数据同步高可用,可扩展以及分布式是ES的一个优势和特色,扩展垂直扩展或者向上扩展,Vertical Scale/Scaling Up,或是水平扩展或者向外扩展,Horizontal Scale/Scaling Out。
一个节点会运行一个ES的实例,一个集群则会包含拥有相同cluster.name的一个或者多个节点,这些节点共同工作来完成数据共享和负载分担。随着节点被添加到集群,或者从集群中被删除,集群会通过自身调节来将数据均匀分布。集群中的一个节点会被选为主节点(Master Node),它负责管理整个集群的变化,如创建或者删除一个索引(Index),向集群中添加或者删除节点。任何节点都可以成为主节点。在我们的例子中只有一个节点,所以它就承担了主节点的功能。ES通过分片将数据分布在集群中。可以将分片想象成数据的容器。文档会被存储在分片中,而分片则会被分配到集群中的节点中。随着集群的扩大和虽小,ES会自动地将分片在节点之间进行迁移,以保证集群能够保持一种平衡。一个分片可以是主分片(Primary Shard)或者副本分片(Replica Shard)。索引中的每份文档都属于一个主分片,所以主分片的数量就决定了你的索引能够存储的最大数据量。一个副本分片则只是一个主分片的拷贝。副本用来提供数据冗余,用来保护数据在发生硬件故障是不会丢失,同时也能够处理像搜索和获取文档这样的读请求。主分片的数量在索引建立之初就会被确定下来,而副本分片的数量则可以在任何时候被更改。
 
具体原理可以参考官方文档:《life inside a cluster》

演示水平扩展,这里新添加一个ES实例的虚拟机,这样我们之前的ES实例为:10.253.1.70,现在添加一个新的节点:10.253.1.71.,需要保证这两个节点之间是可以互相通信的.

配置config/elasticsearch.yml
10.253.1.70相关配置为:
cluster.name: elasticsearch_ryan
node.name: "cluster-node-1"
10.253.1.71相关配置为:
cluster.name: elasticsearch_ryan
node.name: "cluster-node-1"
其实就是要保证有共同的cluster.name
启动10.253.1.71的ES服务,然后可以查看下节点集群的状态:

curl -XPOST "http://10.253.1.70:9200/_cluster/health"
{
    "cluster_name": "elasticsearch_ryan",
    "status": "green",
    "timed_out": false,
    "number_of_nodes": 2,
    "number_of_data_nodes": 2,
    "active_primary_shards": 9,
    "active_shards": 18,
    "relocating_shards": 0,
    "initializing_shards": 0,
    "unassigned_shards": 0
}

可以看到现在有2个节点,status 表集群的状态,具体状态含义:
green:所有的主分片(Primary Shard)和副本分片(Replica Shard)都处于活动状态
yellow:所有的主分片都处于活动状态,但是并不是所有的副本分片都处于活跃状态
red:不是所有的主分片都处于活动状态
这里顺便推荐一个ES分布式集群管理工具 elasticsearch-head,插件方式安装就可以了
sudo elasticsearch/bin/plugin -install mobz/elasticsearch-head
安装后打开管理界面 http://10.253.1.70:9200/_plugin/head/

 

8.jpg

 
可以看到分布式集群中节点的详细信息,还可以执行索引的信息和查询的功能,很方便,集群的状态也很直观。可以往mongo里面继续添加一些数据来测试下

本文来源:http://www.bbyears.com/shujuku/85519.html

热门标签

更多>>

本类排行