您的当前位置:首页正文

大数据量的处理

2020-11-09 来源:个人技术集锦

最近做的项目中涉及到大数据量的问题,具体问题是:监测数字电视的信号,对传输的码流进行指标监测,每秒监测到20000个流,每个流对应着20多个指标,每秒存储一次将这20000流存储起来,需要保存24小时的数据。 这个问题研究了好几天: 一、文件写入存储:但

最近做的项目中涉及到大数据量的问题,具体问题是:监测数字电视的信号,对传输的码流进行指标监测,每秒监测到20000个流,每个流对应着20多个指标,每秒存储一次将这20000流存储起来,需要保存24小时的数据。

这个问题研究了好几天:

一、文件写入存储:但是如果将一天的17亿条记录都写入到一个文件里,没试过,相信会很慢,而且查询的时候会更慢。如果写入到多个文件,按照流ID可以将数据拆成20000个分类,同时对20000个文件执行写入操作也不现实。

二、数据库存储:文件存储的方式pass掉了之后开始考虑数据库存储

1、首先我用的Oracle进行性能测试:

将表按照流ID进列表分区,分为20000个区,然后每个分区内存储86400条数据(也就是该流从一天的第1秒到86400秒对应的指标数据),需要有索引,主键是全局索引,其余的列我又建了4个分区索引。

第一步创建6个表空间,保证每个表空间都能拓展到32GB大小(Oracle的表空间最大能拓展到32GB)

第二步要创建这个分区表:

-- Create table
create table AAA
(
 ID number(8),
 StreamID number(8),
 StreamType number(1),
 FAvailability number(5),
 Bandwidth number(4),
 ValidBandwidth number(4),
 MDI_DF number(5),
 MDI_MLR number(5),
 Delay_Time number(5),
 IPInterval number(5),
 IPJitter number(5),
 Time date,
 MLT15 number(5),
 MLT24 number(5),
 MLS number(5),
 SliceNum number(5),
 CachedTime number(5),
 StuckTime number(5),
 GetSliceErr number(5),
 RetransmitRate number(5),
 RepeatRate number(5),
 SecondsFlag number(5)
)
partition by list(SecondsFlag) 
( 
 partition p1 values(1) tablespace tbs_haicheng 
 
); 
第三步再为t_stream表创建19999个分区:
DECLARE
parName varchar2(100);
sql_str varchar2(500);
BEGIN
 FOR I IN 2..20000 LOOP
 parName:='p'||I;
 sql_str:='ALTER TABLE aaa ADD partition'||' p'||I|| ' VALUES('||I||')';
 execute immediate sql_str;
 END LOOP;
 END; 

第四步为t_stream创建4个分区索引:
-- Create/Recreate indexes 
create index LOCAL_INDEX_REPEATRATE on AAA (REPEATRATE);
create index LOCAL_INDEX_SECONDSFLAG on AAA (SECONDSFLAG);
create index LOCAL_INDEX_STREAM on AAA (STREAMID);
create index LOCAL_INDEX_TIME on AAA (TIME);

第五步创建一个表结构与t_stream相似的表:

create table a
(
 ID number(8),
 StreamID number(8),
 StreamType number(1),
 FAvailability number(5),
 Bandwidth number(4),
 ValidBandwidth number(4),
 MDI_DF number(5),
 MDI_MLR number(5),
 Delay_Time number(5),
 IPInterval number(5),
 IPJitter number(5),
 Time date,
 MLT15 number(5),
 MLT24 number(5),
 MLS number(5),
 SliceNum number(5),
 CachedTime number(5),
 StuckTime number(5),
 GetSliceErr number(5),
 RetransmitRate number(5),
 RepeatRate number(5),
 SecondsFlag number(5)
)
partition by list (SECONDSFLAG)
(
 partition P1 values (1)
 tablespace IPVIEW1
 pctfree 10
 initrans 1
 maxtrans 255
 storage
 (
 initial 64K
 minextents 1
 maxextents unlimited
 )
);
alter table AAA
 add constraint ID primary key (ID)
 using index 
 tablespace TBS_HAICHENG
 pctfree 10
 initrans 2
 maxtrans 255
 storage
 (
 initial 64K
 minextents 1
 maxextents unlimited
 );

第六步向表A中插入86400条数据:
declare
begin
 for i in 1..86400 loop
 insert into a
 (id, streamid, streamtype, favailability, bandwidth, validbandwidth, mdi_df, mdi_mlr, delay_time, ipinterval, ipjitter, time, mlt15, mlt24, mls, slicenum, cachedtime, stucktime, getsliceerr, retransmitrate, repeatrate)
values
 (seq_aaa.nextval, 111, 1, 1111, 1111, 1111, 1111, 1111, 1111, 1111, 1111, SYSDATE, 1111, 1111, 1111, 1111, 1111, 1111, 1111, 1111, 1111); 
 end loop;
 end ;

第七步:向t_stream表中copy数据
declare
begin
 FOR I IN 1..20000 LOOP
 insert into aaa
 select seq_aaa.nextval, streamid, streamtype, favailability, bandwidth, validbandwidth, mdi_df, mdi_mlr, delay_time, ipinterval, ipjitter, time, mlt15, mlt24, mls, slicenum, cachedtime, stucktime, getsliceerr, retransmitrate, repeatrate,I from a;
 commit;
 END LOOP;
 end;

注意:实际上,这一部分我是将1-20000分成20份 ,开了20个线程同时执行,每个线程负责向1000个分区中copy数据(向每个分区录入86400条),这时候明白我为什么要创建表A了吧!

然后,就不管他了,玩游戏看电影去了,两天假结束,想起来去看了一眼插入到什么程度了,发现磁盘有的线程还在执行,有的线程由于表空间写满到32Gb无法再拓展而终止了。

看了一下序列已经被调用到6亿多,说明插入进去了6亿多条是数据。

首先是数据占用的空间问题,与估算的相差太多,我开始插入了上百万的数据,通过查看这上百万数据占用的空间估算出17亿数据占用的空间在180G左右,,而我准备出将近200G的磁盘空间以为足够了呢,结果差了这么多,分析下原因,最主要的一点是索引占用的空间:

我原来在预估的时候忘记了为表创建索引,以为没什么大影响,有10G空间足够索引占用了,可是事实大错特错了,通过下面的语句查看了下空间的占用情况:

1、表占用空间(0.008G 这是A表里的86400条数据占用的空间)
select segment_name, sum(bytes)/1024/1024/1024 GB from user_segments where segment_type='TABLE' group by segment_name;
2、索引占用空间(17.24GB)
select segment_name ,sum(bytes)/1024/1024/1024 GB from user_segments where segment_type IN('INDEX PARTITION','INDEX') group by segment_name;
3、分区表TABLE PARTITION占用空间(63.5GB)
select segment_name,sum(bytes)/1024/1024/1024 GB from user_segments where segment_type='TABLE PARTITION' group by segment_name;
结果分别如下:

\

\

\

注:第三个图中的SEGMENT_NAME的值为T_STREAM 是上文创建的那个分区表。

我们看到结果发现,实际上表数据占用的空间是64GB,跟原来估算的几乎一致,多出来的部分是被索引占了,总共占用了将近100GB的空间,吓死哥了尴尬

缘何索引占用了这么多的空间?可能是我创建索引的方式不对?后续研究补充!

我们的程序采用的策略是首先将17亿条记录手动录入到数据库中,然后当监测到流指标时候对响应的数据进行update操作,也就是一般每秒执行20000个update语句,测试下性能:

declare
j number ;
begin
 for i in 2000000..2020000 loop
update t_stream
 set 
 streamid = 2,
 streamtype = 2,
 favailability = 2,
 bandwidth = 2,
 validbandwidth = 2,
 mdi_df = 2,
 mdi_mlr = 2,
 delay_time = 2,
 ipinterval = 2,
 ipjitter = 2,
 time = sysdate,
 mlt15 = 2,
 mlt24 = 2,
 mls = 2,
 slicenum = 2,
 cachedtime = 2,
 stucktime = 2,
 getsliceerr = 2,
 retransmitrate = 2,
 repeatrate = 2
 where id = i ;
 end loop;
 end ;

这种单纯以主键进行修改的时候他要进行全表扫描(所有的分区需要扫描到),效率很低,大约70s执行完,这才只是6亿数据。

所以我们要让他在执行update语句的时候尽量扫描单个分区,也就是说把那个分区字段当参数传递过来,如下语句所示:

declare
j number ;
begin
 j:=1;
 for i in 2000000..2020000 loop
update aaa
 set 
 streamid = 2,
 streamtype = 2,
 favailability = 2,
 bandwidth = 2,
 validbandwidth = 2,
 mdi_df = 2,
 mdi_mlr = 2,
 delay_time = 2,
 ipinterval = 2,
 ipjitter = 2,
 time = sysdate,
 mlt15 = 2,
 mlt24 = 2,
 mls = 2,
 slicenum = 2,
 cachedtime = 2,
 stucktime = 2,
 getsliceerr = 2,
 retransmitrate = 2,
 repeatrate = 2
 where id = i ;
 j:=j+1;
 end loop;
 end ;

测试这个代码块执行时间为3s,而且虽然现在是6亿数据,但是就是17亿数据执行时间也差不多是3s的,因为它扫描的永远只是20000个分区。而且我的电脑才四核处理器,服务器上24核呢。执行的肯定会比我电脑快多了吧,所以实现预定需求不成问题。

2、后来由于Oracle是收费的,不让用了,汗一个,接下来研究Mysql。

Mysql在建表以及分区的时候遇到两个问题:

问题一:建分区的时候总提示语法错误,无论怎么改都不让我创建分区,Mysql这么火的数据库不可能不支持分区啊。后来一查才知道Mysq5.0版本不支持分区,是从5.1才开始支持表的分区的尴尬,于是把我的数据库版本更换成5.5的,分区成功创建。

问题二:在Mysql上建20000个分区的过程中发现每次执行到中途就报错停止了,查询了解到Mysql的表分区数量是有限制的,每个表最多能有1024个分区。

这对我们影响不太大,大不了我就建1000个分区,每个分区存放86400*20条数据,相信每个分区百万条数据不算什么。

3、首先sqlite数据库不支持分区只好建立20000个表,由于sqlite不支持存储过程,我也没找到sqlite怎样写循环语句。但是建立20000个表 和 录入那么多的数据我们不可能一条一条的去执行写语句执行,所以需要另想办法,我的解决过程:

首先我想到可以用调用批处理文件的方式插入数据和建表:

建一个 批量建表.bat文件,文件内容如下:

@ECHO OFF 
For /L %%i in (1,1,20000) do (sqlite3.exe hc.db

createTable.bat 内容如下:

create table 1%(ID integer primary key autoincrement,
 STREAMID NUMBER(10),
 STREAMTYPE NUMBER(1),
 FAVAILABILITY NUMBER(5),
 BANDWIDTH NUMBER(4),
 VALIDBANDWIDTH NUMBER(4),
 MDIDF NUMBER(5),
 MDIMLR NUMBER(5),
 DELAY_TIME NUMBER(5),
 IPINTERVAL NUMBER(5),
 IPJITTER NUMBER(5),
 TIME DATE,
 MLT15 NUMBER(5),
 MLT24 NUMBER(5),
 MLS NUMBER(5),
 SLICENUM NUMBER(5),
 CACHEDTIME NUMBER(5),
 STUCKTIME NUMBER(5),
 GETSLICEERR NUMBER(5),
 RETRANSMITRATE NUMBER(5),
 REPEATRATE NUMBER(5),
 SECONDSFLAG NUMBER(5),
 PART NUMBER(5)
);

问题出现了,在执行批量建表.bat的时候提示sqlite语法错误。至今也没找到原因:

问题肯定是出现在传递的动态参数上,createTable.bat成功的接到了参数,语句在sqlite中执行不报错,放在bat里就报错。 所以第一次批量建表没成功。

那就用咱们的老本行,写JAVA程序:

需要一个驱动包:sqlitejdbc-v033-nested.jar。

代码如下:

import java.sql.*;
import org.sqlite.JDBC;
/**
 * sqlite创建数据库以及批量建表
 * @time 2014-01-07
 * @author HaiCheng
 *
 */
public class createTable {
	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
	try{
	//1,保证SQLite数据库文件的路径首字符为小写,否则报错
	String thisPath = "e:/haicheng.db";
	String sql = "jdbc:sqlite://"+thisPath;//windows && linux都适用
	 //2,连接SQLite的JDBC
	 Class.forName("org.sqlite.JDBC"); 
	 //建立一个数据库名haicheng.db的连接,如果不存在就在当前目录下自动创建
	 Connection conn = DriverManager.getConnection(sql);
	 //3,创建表
	 Statement stat = conn.createStatement();
	 for(int i=1 ;i<=20000;i++){
	 String sql1=" create table bbb"+i+" " +
	 	 " 	(" +
	 " ID INTEGER primary key autoincrement," +
	 " STREAMID NUMBER(10)," +
	 " STREAMTYPE NUMBER(1)," +
	 " FAVAILABILITY NUMBER(5)," +
	 " BANDWIDTH NUMBER(4)," +
	 " VALIDBANDWIDTH NUMBER(4)," +
	 " MDI_DF NUMBER(5)," +
	 " MDI_MLR NUMBER(5)," +
	 " DELAY_TIME NUMBER(5)," +
	 " IPINTERVAL NUMBER(5)," +
	 " IPJITTER NUMBER(5)," +
	 " TIME DATE," +
	 " MLT15 NUMBER(5)," +
	 " MLT24 NUMBER(5)," +
	 " MLS NUMBER(5)," +
	 " SLICENUM NUMBER(5)," +
	 " CACHEDTIME NUMBER(5)," +
	 " STUCKTIME NUMBER(5)," +
	 " GETSLICEERR NUMBER(5)," +
	 " RETRANSMITRATE NUMBER(5)," +
	 " REPEATRATE NUMBER(5)," +
	 " SECONDSFLAG NUMBER(5)," +
	 " PART NUMBER(5)" +
	 " 	);";
	 System.out.println(sql1);
	 String sql2="CREATE INDEX index_flag"+i+" ON bbb"+i+"(SECONDSFLAG);";
	 String sql3="CREATE INDEX index_part"+i+" ON bbb"+i+"(PART);";
	 stat.executeUpdate( sql1 );
	 stat.executeUpdate( sql2 );
	 stat.executeUpdate( sql3 );
	 }
	 stat.close();
	 conn.close(); //结束数据库的连接 
	 }
	 catch( Exception e )
	 {
	 e.printStackTrace ( );
	 }
	}

}
import java.sql.*;
import org.sqlite.JDBC;
/**
 * 向第一个表中循环录入数据
 * @author HaiCheng
 *
 */
public class insertData {
	public static void main(String[] args) throws Exception {
	try{
	//1,保证SQLite数据库文件的路径首字符为小写,并且路径为unix路径
	String thisPath = "e:/haicheng.db";
	String sql = "jdbc:sqlite://"+thisPath;//windows && linux都适用
	//2,连接SQLite的JDBC
	Class.forName("org.sqlite.JDBC"); 
	//建立一个数据库名haicheng.db的连接,如果不存在就在当前目录下自动创建
	Connection conn = DriverManager.getConnection(sql);

	//4,插入一条数据
	for(int i=1;i<=86400;i++){
	 	PreparedStatement prep = conn.prepareStatement("insert into bbb1(STREAMID) values (?);");
	 prep.setInt(1, 0);
	 prep.addBatch();
	 conn.setAutoCommit(false);
	 prep.executeBatch();
	 }
	 conn.setAutoCommit(true);
	 stat.close();
	 conn.close(); //结束数据库的连接 
	 System.out.println("数据插入成功");
	 }
	 catch( Exception e )
	 {
	 System.out.println("数据插入异常");
	 e.printStackTrace ( );
	 }
	}

}
import java.sql.*;
import org.sqlite.JDBC;
/**
 * 向其余19999个表中批量拷贝数据
 * @author HaiCheng
 *
 */
public class copyData {
	public static void main(String[] args) throws Exception {
	try{
	 //1,保证SQLite数据库文件的路径首字符为小写,并且路径为unix路径
	 String thisPath = "e:/haicheng.db";
	 String sql = "jdbc:sqlite://"+thisPath;//windows && linux都适用
	 //2,连接SQLite的JDBC
	 Class.forName("org.sqlite.JDBC"); 
	 //建立一个数据库名haicheng.db的连接,如果不存在就在当前目录下自动创建
	 Connection conn = DriverManager.getConnection(sql);
	 //3,创建表
	 Statement stat = conn.createStatement();
	 for(int i=2;i<=20000;i++){
	 String sql1="insert into bbb"+i+" select * from bbb1";
	 System.out.println(sql1);
	 stat.execute(sql1);
	 }
	 stat.close();
	 conn.close(); //结束数据库的连接 
	 System.out.println("数据插入成功");
	 }
	 catch( Exception e )
	 {
	 System.out.println("数据插入异常");
	 e.printStackTrace ( );
	 }
	}

}
依次执行这三个类,当执行第三个类的时候也就是批量向数据库中录入数据的时候,当数据文件大小达到2G的临界点的时候(不同方式测试多遍都是这种情况),再继续写入数据,那么数据文件就会损坏(文件大小都变了,从2GB变成1MB了)。

分析各种原因:

(1)、正在写入数据的时候断电(排除,没有断电)

(2)、磁盘有坏道(排除,在磁盘中放些其他的文件,换一段空间存储这个数据同样到2GB崩溃)

(3)、数据文件所在磁盘空间不足(排除,硬盘空间足够、sqlite也不像Oracle那样有着表空间的概念)

最终我也没找到什么原因,发帖求助。

-------------------------------------------------------------------------------------------------------------------------

尴尬上面那些还是年前写的东西,也没有写完。最终是sqlite的问题没有解决。目前还是用着Mysql

Top