Skip to content
muyannian edited this page Feb 25, 2014 · 33 revisions

大量的zk报错

  1. 请使用zookeeper-3.4.5.jar以及以上版本
  2. 如下几个配置请根据zk的负载情况配置
    tickTime=2000 
    initLimit=10
    syncLimit=5
    maxClientCnxns=300
    maxSessionTimeout=20000

大分组的计算和count(distinct)的讨论
    根据我们在电信行业的实践,经分系统中真正决定性能的都是一些大分组查询,无法通过数据预分布进行优化。现有的MPP架构对网络带宽要求确实比较高,否则节点的处理能力很难发挥出来。总体来看,开源软件在即席统计方面的性能水平,和商业软件差距极大。数据量一大,查询一复杂,基本上就跑不出来了。
    延年:大分组查询:我能不能这样理解,就是分类汇总后的组数特别多(例如uuid),这种场景会导致大量的网络IO。 针对这种情况我们采用近似计算,来解决这个IO问题,由于没整理文档,可以阅读我的开发日志。https://github.com/muyannian/higo/issues/117 与 https://github.com/muyannian/higo/issues/119
    神通冯柯:回复@延年:嗯,我觉得如果分组足够大,比如求 TOP N,我们可以在每个节点求TOP 10*N,再汇总,从概率的角度上看,对于许多应用都足够了。当然在商业应用中,很多时候我们并没有TOP N这个条件,应用就是要返回所有分组,比较常见的是OLAP建模,用户需要把分组统计结果导入到一个Cube中。
    延年:回复@神通冯柯:恩,mdrill在这个地方确实采用了近似计算,用以满足我们的业务,count(distinct)的计算也是类似,供大家参考,https://github.com/muyannian/higo/wiki/distinct

  神通冯柯:回复@延年:嗯,很不错,尽管我个人觉得,如果group key和dist key之间存在data skew的时候,准确度恐怕很难达到你宣称的99%。不过,能满足业务需求的设计就是好设计。

数据在内存中是如何存储的

1,mdrill是先进行分shard的,每个shard分布在不同的机器的不同的进程中
2, 每个shard是按照时间进行分区的,每个分区是一个索引
3,每个索引是按照列进行存储的,每次缓存的时候加载一个列的值到内存中,多个分区,多个表,多个列之间LRU方式淘汰
4.加载到内存中的列,并非是原始值,而是一个值的整形的代号,比方说用一个数字899代替一个很长的字符串

对内存结构的解释

  1. 将lucene默认的将整个列数据全部都load到内存中的方式修改为load 每个列的值的编码代号,操作的时候也仅仅操作这些代号,真正展现的时候再将这些编号转换成真实的值,编号的数据类型根据某个列的值的重复程度可以为byte,short,int
  2. 将数据进行分区(默认按照时间),用到的数据会加载到分区中,不用的分区会从内存中踢出,采用LRU的方式管理,如果同时需要检索大量的分区,则进行排队处理,一个分区一个分区的处理。
  3. 多个表之间也合并,共享内存,用到的表才会加载到内存中,没用到的则在硬盘中存储。
  4. 原先merger server与shard是在同一个进程中的,每次查询的时候随机使用其中一个shard作为merger server,如果每次查询merger server使用1G的内存,但shard的数量非常多,merger server每次只用一个,但是为每个shard都额外分配1G内存就是浪费,新版mdrill将这两者分开,避免浪费。
  5. 按照内存大小进行LRU,而不是按照field的个数,不同列因为重复读不同对内存的消耗也不一样,按照个数lru不合理,按照总内存使用LRU
  6. 由于每次逆旋都需要消耗时间,当lur被淘汰的时候,将逆旋的结果保留到硬盘中,以备下次使用。

mdrill依赖于一个jstorm的系统,我不是很理解mdrill的系统架构中是怎么使用jstorm这样的流式计算的?
mdrill使用jstorm做任务的监控和管理,如果某个任务挂了 nimbus会通过心跳的方式识别迁移该任务。并没有使用jstorm的流计算

如果我要部署一个mdrill的集群,zmq是必须在每个节点上面都安装部署么?
zeromq每个节点必须安装是因为,jstorm 依赖zeromq

./bluewhale mdrill index 创建索引的时候,{hdfs源数据地址}有没有什么格式上的要求,另外,{清洗多少天的数据}这个参数的含义具体是什么样子的?
1.hdfs源数据地址 必须有啊 安装文档有说明 ,数据要按照日期分区 ,并且要有 thedate字段 。
2.清洗多少天数据是因为这样,有很多系统只要求保存最近1年或一个月的数据,超过1年或一个月希望能踢出之前旧的数据
就是过期的数据希望能清理掉
3.数据是按照一定分隔符分割开的 ,默认文件格式为sequencefile格式(云梯标准格式),当然也可以设定为文本的格式

另外就是mdrill什么时候会将hdfs上面的index文件download到本地?我看文档里面没有涉及到这个步骤,是不是在“启动表”的时候?
启动的时候会download,然后 没间隔5分钟,会检查一下 校验文件vertify,如果那个分区的vertify不一致 ,那个索引重新下载
创建完的索引 目录有一个vertify文件
所以 这边索引创建完成后 最坏的情况是5分钟后才开始下载

请问下hadoop版本问题,我昨天试了hadoop 2.05.-alpha这个版本,但是在createtable的时候总是出现目录无法创建的问题,我想请问下用哪个版本的hadoop比较稳定,现在hadoop0.20.x的在官网上都没有了,选用hadoop1.1.X的行不?
亲 如果要测试 建议用hadoop 0.20.2版本,因为我用的也是这个版本,哈哈。

如果生产环境用的是其他版本,请更改pom.xml,将hadoop 版本做响应更改,更改成兼容版本

另外hadoop 2.05-alpha版本 我不建议使用,目前还不能作为生产环境使用,一般都是供hadoop爱好者 研究源码用的


如何编译源码
采用maven编译,编译命令为
mvn clean && mvn package assembly:assembly

创建索引时候我的分隔符为\001或\t如何设定啊
目前可以通过 设置为default和tab来表示\001和\t
示例如下
./bluewhale higo index fact_seller_all_d /data/fact_seller_all_d 1000 20010101 seq default
./bluewhale higo index fact_seller_all_d /data/fact_seller_all_d 1000 20010101 seq tab

执行创建索引的命令后无任何反应
./bluewhale higo index fact_seller_all_d /data/fact_seller_all_d 10 20010101 seq default
1、先确定原始数据目录是否存在,是否是按照日期分区
2.这里的10这个参数,表示只对最近10天的索引进行创建,确保存在最近10天的分区目录,以及最近10天的分区目录要有数据

higo.shards.count+higo.mergeServer.count怎么理解
higo.shards.count+higo.mergeServer.count =启动的总的进程数 higo.shards.count:表示启动的shard数量,一个shard就是一个存储索引的solr higo.mergeServer.count:表示启动的merger server的数量,一个merger server也是一个solr,但是他没有索引,仅仅用于合并shard的数据
报错:出现-data-hs_err_pid13033.log文件或者日志中提升下述信息 VirtualPortDispatch [INFO] Received invalid message directed at port 516. Dropping...
1.清理 storm.local.dir
2.确保 storm.local.dir有读写权限,硬盘可用
3.确保安装正确的zeroMq和jzmq版本,如果是64位系统,可以使用我传的这个https://github.com/alibaba/mdrill/tree/master/software
配置文件中的 worker.childopts与 higo.merge.ports有什么区别
worker.childopts用来标记 这台机器可以启动那些端口
higo.merge.ports用来标记 这些端口中那些是merger server的端口

那我这样配置行么?

 supervisor.slots.ports:
    - 6701
    - 6702
    - 6703
    - 6704
    - 6705
    - 6706
    - 6707
    - 6708
    - 6601
    - 6602
    - 6603
    - 6604
 higo.merge.ports: "6601,6602,6603,6604"

这样是不是指定了8个shard端口和4个mage端口?
答,一般一台机器启动一个merger server就够了 ,没必要启动那么多
就一个6601 吧 merger server 不需要那么多的
merger server只管理合并数据,shard才是资源使用大户

继续问

 supervisor.slots.ports:
    - 6701
    - 6702
    - 6601
 higo.merge.ports: "6601"

那这样就是每个节点2个shard,1个mege?
继续答
这个要 跟higo.shards.count+higo.mergeServer.count 一同工作 ,有可能你分配了 很多端口 但是 higo.shards.count+higo.mergeServer.count你就设置了3个,那么就只使用3个
比如说 你有100台机器 ,但你只配置了3个 ,就会在这100台机器中选3个来启动,余下的都空闲

继续问
higo.shards.count 2
higo.mergeServer.count 1
这样是控制每个节点的吧


不是的
这个是控制整体的

supervisor.slots.ports:
    - 6701
    - 6702
    - 6601
 higo.merge.ports: "6601"  

这个配置表上 你一台机器上 可以分配3个进程,如果6台的话 你可以分配18个进程

higo.shards.count 2
higo.mergeServer.count 1 

表示你会在这18个进程中抽取3个进程


storm.local.dir与nimbus.host这俩配置是干啥的
storm 有一个总的调度叫nimbus类似hadoop的namenode与jobtracker 在那台机器上启动nimbus这个host就设置为那台
storm.local.dir 因为storm本身工作需要一个工作空间 ,需要分配一个目录

higo execute [create, ./creat.sql]报错,建表失败啊,可能啥问题?
典型的报错信息如下
Exception in thread "main" java.io.IOException: Call to h252020/172.16.252.20:9000 failed on local 
exception: java.io.EOFException
        at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
        at org.apache.hadoop.ipc.Client.call(Client.java:743)
        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
        at $Proxy0.getProtocolVersion(Unknown Source)
        at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
        at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:106)
        at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:207)
        at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:170)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1378)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1390)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:196)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:95)
        at com.alimama.mdrill.topology.MdrillMain.createtable(MdrillMain.java:73)
        at com.alimama.mdrill.topology.MdrillMain.main(MdrillMain.java:44)
        at com.alimama.bluewhale.core.drpc.Mdrill.main(Mdrill.java:11)
Caused by: java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:375)
        at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:501)
        at org.apache.hadoop.ipc.Client$Connection.run(Client.java:446)
答:
1.请检查 hadoop 版本是否一致,如果不一致 请改下pom.xml中的hadoop版本或者直接替换编译好的lib下的跟hadoop相关的jar包
2.请检查hadoop是否是可用的状态
3.注意下,storm.yaml里的hadoop.conf.dir参数是否配置正确

1、 不知道海狗是否有使用solrCloud?如果有的话,是否需要先起一个zk的服务器?在集群发布的时候,如何保证zk的服务器机器可以先启?如果没有使用solrCloud,那请问是如何解决solr的集群问题的?
目前还没有使用,集群是通过storm来实现的,相当于每个storm的worker进程,启动一个solr实例,storm会管理 solr的心跳,档机后的任务迁移问题

2、 不知道海狗在原来solr的基础上有哪些改进?
参考这个地址吧 https://github.com/alibaba/mdrill/blob/master/doc/improve.docx?raw=true

3、 海狗是如何使用solr的?是使用solr原生自带的jetty服务器?还是将solr嵌在应用中,对solr做了一定程度的包装?是如何进行包装的?
采用嵌入式的方式启动的
4、 海狗是否支持多core?如果支持的话,请问是如何进行管理的?
每台机器启动多个进程,每个进程都是通过storm来管理
5、 海狗是如何建立数据的?如果数据格式有所改变,请问是如何进行更新的?数据索引是如何重建的?
目前确实不能更新,数据都是T+1的,通过每天执行一遍mapreduce建立

全文检索模式如果配置

首先 配置storm.yaml
higo.mode.s_ods_log_p4pclick: "@fdt@nonzipout@" 如果数据不是特别大,一天也就几个亿 可以存储在本地,全文检索
higo.mode.ods_allpv_ad_d: "@hdfs@fdt@sigment:5@blocksize:1073741824@iosortmb:256@igDataChange@" 如果数据特别大(百亿级别) 数据存储在hdfs上进行全文检索

然后 创建表的时候 记得设置存储原始数据
./bluewhale mdrill create ./c.sql true

全文检索模式 支持如下几种数据类型 string,tlong ,tdobule,text 其中text为分词的数据类型
全文检索模式 不能进行统计 ,只能进行TOPN的匹配

文档里面你说针对超过1w组后可能出现不准确的情况,可以进行第二次查询获取准确结果


这里每太明白
能举个具体例子么
开心延年 16:29:23
以userid为例,我想查询这一年,购买商品次数最多的用户.

这些用户的交易,在mdrill里存储是分布在不同的机器里上的。

由于排序是在每个shard里进行的排序,
第一shard里排序结果是1,2,3,4,5,6,7 取三个用户为1,2,3,
第二个shard里 可能是 3,4,5,6,7,8,9,2,3
取top3为3,4,5. 汇总后结果为2,3,4
.那么对于2和4这个用户来说 信息是不完整的,仅仅提现了近似的排序关系。

故需要进行二次查询,重新计算一次2,3,4的真实汇总结果,第二次查询,屏蔽了2,3,4以外的组,所以数据迁移成本很小

其他关于近似计算的讨论,参考这里

随性不改(360352639) 16:35:20 
@开心延年 :这种分布式合并结果的准确性很依赖于数据的分布啊
随性不改(360352639) 16:35:38 
会丢弃很多的结果
小倬(1695251331) 16:35:49 
能确保2,3,4是这两个shard里的top3么
开心延年 16:46:41 
不能确保  首先就是假设   每个shard里的数据 分布规律 跟整体是一致的 ,但是确实存在排序一行的情况 (可以举例的)
开心延年 16:46:49 
不能确保  首先就是假设   每个shard里的数据 分布规律 跟整体是一致的 ,但是确实存在排序异常的情况 (可以举例的)
开心延年 16:47:08 
小倬(1695251331)  16:35:49
能确保2,3,4是这两个shard里的top3么

所以是近似计算  
开心延年 16:50:54 
在海量数据下  数据量越多  每个shard的分布规律与整体越像  
随性不改(360352639) 16:58:23 
group by的key分布越均匀结果越准确吧
QQ2013(835552872) 17:03:20 
如果超过1W组像select avg(score) as a from test group by class order by a desc这种查询岂不是不能用了啊?
开心延年 17:06:56 
为什么不能用了?  avg中计算的数据是准确的,有可能是排序是不准确的 
我用了很多hive表做了测试 现实还没发现排序不准确的情况,虽然理论上是有的
小倬(1695251331) 17:08:54 
现在各自的shard计算avg,再汇总合并?
开心延年 17:10:16 
对于avg来说 计算的是 count+sum 
开心延年 17:10:47 
avg是无法累加的 ,但是count和sum是可以分别累加的 
随性不改(360352639) 17:13:03 
@开心延年 :每个shard返回的avg时只计算了该shard的key对应的record,汇总合并的时候每个shard中也有该key对应的record,有可能会某个shard该key对应的record并没有被返回合并,是否可以认为count和sum等都是不正确的
开心延年 17:14:02 
不是说了么? 一共分为两次查询
第一次查询的结果仅仅是为了排序,真正的结果是第二次查询出来的 
随性不改(360352639) 17:14:34 
第二次会去各个shard再取所有的进行聚合?
开心延年 17:15:22 
是啊 必须的 第二次因为已经知道了结果 所以就不需要将所有的KEY 都查出来,仅仅查出对应的结果就行了 
随性不改(360352639) 17:15:39 
牛
随性不改(360352639) 17:16:16 
第一次保证rank,第二次保证right
开心延年 17:16:24 
solr 本身也是这样干的 
开心延年 17:16:41 
solr的facet 其实也是近似的结果

分隔符使用-会报错是什么原因

hadoop的ToolRunner.run(conf, index, jobValues);会吞到-这个符号,所以要转义一下
将分隔符换成  16@2d ,代替  -

全文检索模式,数据存储在hdfs上,那几个参数的解释

需要这样配置 higo.mode.ods_allpv_ad_d: "@hdfs@fdt@sigment:15@blocksize:1073741824@iosortmb:256@igDataChange@" higo.index.parallel.ods_allpv_ad_d: 40 不过强烈不建议 这样搞
mdrill(313945135) 13:20:32 呵呵好 开心延年[email protected] 13:20:48 这个表 之所以放到hdfs上 是因为每天的数据量太大了 我本地硬盘不够 迫于无奈 才 放上去的 每天100多个亿 还都是长文本 创建索引后 20多个T 我每台机器 才20个T 10天硬盘就存满了 开心延年[email protected] 13:24:39 higo.mode.ods_allpv_ad_d: "@hdfs@fdt@sigment:15@blocksize:1073741824@iosortmb:256@igDataChange@" @hdfs@ 表示使用HDFS进行检索 (数据不会COPY到本地) @fdt@ 表示只会进行全文检索 而不会进行 统计 ,全文检索不会进行排序 @sigment:15@ 如果不配置 一个shard生成1个索引,但是当数据量特别大的时候,可以生成多个索引 @blocksize:1073741824@ 使用hdfs模式 ,块的大小 要给大一些 会提升查询速度 @igDataChange@ 这个大家可以不加,表示 忽略目录时间错和文件大小的变化,意味着 之前创建过索引的日期,如果有改动了,不会重新生成索引