单元 1. 准备数据¶
在训练机器学习模型之前,需要准备用于模型训练的示例数据。
开始前准备¶
需提前将风领域沉淀的风功率算法文件和气象数据文件上传至EnOS HDFS,用于创建模型训练所需的数据表。在本教程中,存储文件的 HDFS 路径为:hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/
。
准备风功率算法数据¶
通过以下步骤,将上传至 EnOS HDFS 的文件映射为 Hive 分区表,用风场站 ID (masterid) 作为分区键来模拟实际场景。
使用
powers
和speeds
文件,创建 Hive 分区表(enos_power_speeds),按场站(masterid)和类型(type)分区。powers
类型对应的值是power
(功率);speeds
类型对应的值是speed
(风速)。create external table enos_power_speeds(utcTimeStamp timestamp, value double) partitioned by (masterid string,type string) row format delimited fields terminated by ',' lines terminated by '\n' location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new' tblproperties ("skip.header.line.count"="1");
添加分区:
alter table enos_power_speeds add partition(masterid='CGNWF0046',type='powers') location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/CGNWF0046/powers'; alter table enos_power_speeds add partition(masterid='CGNWF0046',type='speeds') location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/CGNWF0046/speeds';
在 Hive 中指定批数据处理队列名称(通过资源管理申请):
set mapreduce.job.queuename=root.test_enos_01;
检查验证:
select count(1) from enos_power_speeds where masterid='CGNWF0046' and type='powers'
使用
speed_to_power.csv
文件,创建speed_to_power
表:create external table speed_to_power(speed double,power double) row format delimited fields terminated by ',' lines terminated by '\n' location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/speed_to_power' tblproperties ("skip.header.line.count"="1");
准备外部气象数据¶
通过以下步骤,创建外部气象数据表:
创建
external_weather
表,按气象数据来源类型(EC 和 GFS)分区:create external table external_weather(utcTimeStamp timestamp, pres double,tmp double,wd double,ws double, weatherpublishstarttime timestamp) partitioned by (type string) row format delimited fields terminated by ',' lines terminated by '\n' location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/112.647/21.938' tblproperties ("skip.header.line.count"="1");
添加分区:
alter table external_weather add partition(type='EC') location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/112.647/21.938/EC'; alter table external_weather add partition(type='GFS') location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/112.647/21.938/GFS';
创建 group:
create table external_weather1 as select utctimestamp, avg(pres) as pres, avg(tmp) as tmp, avg(wd) as wd, avg(ws) as ws, max(weatherpublishstarttime) as weatherpublishstarttime, type from external_weather group by type, utctimestamp;
检查是否 group 处理生效:
select * from external_weather1 where utcTimeStamp='2018-07-03';
整合最终的数据¶
创建下列 Hive 表,整合数据:
create table kmmlds2 as
select
t1.utctimestamp as X_basic_time,
hour(t1.utctimestamp) as X_basic_hour,
current_timestamp as X_basic_forecast_time,
(row_number() over(partition by 1) - 1)%49 as X_basic_horizon,
datediff(t1.utctimestamp,'2018-07-01') as i_set,
t3.weatherpublishstarttime as EC_nwp_time,
t3.ws as EC_ws,
t3.wd as EC_wd,
t3.tmp as EC_tmp,
t3.pres as EC_press,
t3.rho as EC_rho,
t3.dist as EC_dist,
t4.weatherpublishstarttime as GFS_nwp_time,
t4.ws as GFS_ws,
t4.wd as GFS_wd,
t4.tmp as GFS_tmp,
t4.pres as GFS_press,
t4.rho as GFS_rho,
t4.dist as GFS_dist,
t1.value as speed,
t2.value as power
from
(select utctimestamp,value from enos_power_speeds where masterid='CGNWF0046' and type='speeds' and value is not null) t1
left join
(select utctimestamp, value from enos_power_speeds where masterid='CGNWF0046' and type='powers' and value is not null ) t2
on t1.utctimestamp =t2.utctimestamp
left join
(select utctimestamp,pres, tmp, wd, ws, weatherpublishstarttime,type,pres*sqrt(2)/(287.05 * tmp+273.15) as rho,row_number() over(partition by 1) +12 as dist from external_weather1 where type='EC') t3
on t1.utctimestamp =t3.utctimestamp
left join
(select utctimestamp,pres, tmp, wd, ws, weatherpublishstarttime,type,pres*sqrt(2)/(287.05 * tmp+273.15) as rho,row_number() over(partition by 1) +12 as dist from external_weather1 where type='GFS') t4
on t1.utctimestamp =t4.utctimestamp
hdfs dfs -cp /user/hive/warehouse/data_o15632609593521.db/kmmlds2/000000_0 /user/hive/warehouse/data_o15632609593521.db/kmmlds1/CGNWF0046/
CREATE external TABLE `kmmlds1`(
`x_basic_time` timestamp,
`x_basic_hour` int,
`x_basic_forecast_time` timestamp,
`x_basic_horizon` int,
`i_set` int,
`ec_nwp_time` timestamp,
`ec_ws` double,
`ec_wd` double,
`ec_tmp` double,
`ec_press` double,
`ec_rho` double,
`ec_dist` int,
`gfs_nwp_time` timestamp,
`gfs_ws` double,
`gfs_wd` double,
`gfs_tmp` double,
`gfs_press` double,
`gfs_rho` double,
`gfs_dist` int,
`speed` double,
`power` double)
partitioned by(masterid string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'/user/hive/warehouse/data_o15632609593521.db/kmmlds1';
alter table kmmlds1 add if not exists partition(masterid='ABCDE0001')
location '/user/hive/warehouse/data_o15632609593521.db/kmmlds1/ABCDE0001';
alter table kmmlds1 add if not exists partition(masterid='CGNWF0046')
location '/user/hive/warehouse/data_o15632609593521.db/kmmlds1/CGNWF0046';
使用外部气象数据¶
CREATE external TABLE `kmmlds91x`(
`sequence` int,
`i_set` int,
`x_basic_forecast_time` timestamp,
`x_basic_horizon` int,
`x_basic_time` timestamp,
`x_basic_hour` int,
`ec_nwp_time` timestamp,
`ec_dist` int,
`ec_ws` double,
`ec_wd` double,
`ec_rho` double,
`ec_press` double,
`ec_tmp` double,
`gfs_nwp_time` timestamp,
`gfs_dist` int,
`gfs_ws` double,
`gfs_wd` double,
`gfs_rho` double,
`gfs_press` double,
`gfs_tmp` double
)
row format delimited fields terminated by ','
lines terminated by '\n'
location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kmmlds91x'
tblproperties ("skip.header.line.count"="1");
CREATE external TABLE `kmmlds91y`(
`sequence` int,
`speed` double,
`power` double
)
row format delimited fields terminated by ','
lines terminated by '\n'
location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kmmlds91y'
tblproperties ("skip.header.line.count"="1");
创建状态表¶
使用以下命令,创建状态表,用于记录每天数据的更新情况:
create table status_tbl(masterid string, updatetime timestamp, flag int);
插入状态标志位:
insert into status_tbl values('ABCDE0001','2020-06-13',1);
整合数据结构说明¶
整合后的数据的数据类型和描述,如下表所示:
字段名 |
类型 |
描述 |
---|---|---|
x_basic_time |
timestamp |
时间字段,每小时一条数据,世界时时间标签 |
x_basic_hour |
int |
小时数,从x_basic_time里获取小时数 |
x_basic_forecast_time |
timestamp |
一般是预测第二天零点的值 |
x_basic_horizon |
int |
数字编号:0~48,反复 |
i_set |
int |
与horizon对应的数字编号,0,1,2…. |
ec_nwp_time |
timestamp |
numerical weather prediction,EC气象预报时间,一般每天的12:00预报第二天临时的气象信息 |
ec-ws |
double |
风速 |
ec-wd |
double |
风向 |
ec-tmp |
double |
温度 |
ec-pres |
double |
气压 |
ec-rho |
double |
空气密度 |
ec-dist |
double |
一个计数,没有实际意义 |
gfs_nwp_time |
timestamp |
numerical weather prediction,GFS气象预报时间,一般每天的12:00预报第二天临时的气象信息 |
gfs-ws |
double |
风速 |
gfs-wd |
double |
风向 |
gfs-tmp |
double |
温度 |
gfs-pres |
double |
气压 |
gfs-rho |
double |
空气密度 |
gfs-dist |
double |
一个计数,没有实际意义 |
speed |
double |
实际风速值 |
power |
double |
实际功率值 |