单元 1. 准备数据

在训练机器学习模型之前,需要准备用于模型训练的示例数据。

开始前准备

需提前将风领域沉淀的风功率算法文件和气象数据文件上传至EnOS HDFS,用于创建模型训练所需的数据表。在本教程中,存储文件的 HDFS 路径为:hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/

准备风功率算法数据

通过以下步骤,将上传至 EnOS HDFS 的文件映射为 Hive 分区表,用风场站 ID (masterid) 作为分区键来模拟实际场景。

  1. 使用 powersspeeds 文件,创建 Hive 分区表(enos_power_speeds),按场站(masterid)和类型(type)分区。powers 类型对应的值是 power(功率);speeds 类型对应的值是 speed (风速)。

    create external table enos_power_speeds(utcTimeStamp timestamp, value double)
    partitioned by (masterid string,type string)
    row format delimited fields terminated by ','
    lines terminated by '\n'
    location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new'
    tblproperties ("skip.header.line.count"="1");
    
  2. 添加分区:

    alter table enos_power_speeds add partition(masterid='CGNWF0046',type='powers')
    location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/CGNWF0046/powers';
    
    alter table enos_power_speeds add partition(masterid='CGNWF0046',type='speeds')
    location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/CGNWF0046/speeds';
    
  3. 在 Hive 中指定批数据处理队列名称(通过资源管理申请):

    set mapreduce.job.queuename=root.test_enos_01;
    
  4. 检查验证:

    select count(1) from enos_power_speeds where masterid='CGNWF0046' and type='powers'
    
  5. 使用 speed_to_power.csv 文件,创建 speed_to_power 表:

    create external table speed_to_power(speed double,power double)
    
    row format delimited fields terminated by ','
    lines terminated by '\n'
    location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/speed_to_power'
    tblproperties ("skip.header.line.count"="1");
    

准备外部气象数据

通过以下步骤,创建外部气象数据表:

  1. 创建 external_weather 表,按气象数据来源类型(EC 和 GFS)分区:

    create external table external_weather(utcTimeStamp timestamp, pres double,tmp double,wd double,ws double, weatherpublishstarttime timestamp)
    partitioned by (type string)
    row format delimited fields terminated by ','
    lines terminated by '\n'
    location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/112.647/21.938'
    tblproperties ("skip.header.line.count"="1");
    
  2. 添加分区:

    alter table external_weather add partition(type='EC')
    location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/112.647/21.938/EC';
    
    alter table external_weather add partition(type='GFS')
    location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kongmingmldemo/new/112.647/21.938/GFS';
    
  3. 创建 group:

    create table external_weather1  as
    
    select
    
    utctimestamp,
    
    avg(pres) as pres,
    
    avg(tmp) as tmp,
    
    avg(wd) as wd,
    
    avg(ws) as ws,
    
    max(weatherpublishstarttime) as weatherpublishstarttime,
    
    type
    
    from
    
    external_weather
    
    group by type, utctimestamp;
    
  4. 检查是否 group 处理生效:

    select * from external_weather1 where utcTimeStamp='2018-07-03';
    

整合最终的数据

创建下列 Hive 表,整合数据:

create table kmmlds2 as

select

t1.utctimestamp as X_basic_time,

hour(t1.utctimestamp) as X_basic_hour,

current_timestamp as X_basic_forecast_time,

 (row_number() over(partition by 1)  - 1)%49  as X_basic_horizon,

datediff(t1.utctimestamp,'2018-07-01') as i_set,

t3.weatherpublishstarttime as EC_nwp_time,

t3.ws as  EC_ws,

t3.wd as  EC_wd,

t3.tmp as  EC_tmp,

t3.pres as  EC_press,

t3.rho as  EC_rho,

t3.dist as  EC_dist,

t4.weatherpublishstarttime as GFS_nwp_time,

t4.ws as  GFS_ws,

t4.wd as  GFS_wd,

t4.tmp as  GFS_tmp,

t4.pres as  GFS_press,

t4.rho as  GFS_rho,

t4.dist as  GFS_dist,

t1.value as speed,

t2.value as power

from

(select utctimestamp,value from enos_power_speeds where  masterid='CGNWF0046' and type='speeds' and value is not null) t1

left join

(select utctimestamp, value from enos_power_speeds where masterid='CGNWF0046' and type='powers' and value is not null ) t2

on t1.utctimestamp =t2.utctimestamp

left join

(select utctimestamp,pres, tmp, wd, ws, weatherpublishstarttime,type,pres*sqrt(2)/(287.05 * tmp+273.15) as rho,row_number() over(partition by 1) +12 as dist from external_weather1 where type='EC') t3

on t1.utctimestamp =t3.utctimestamp

left join

(select utctimestamp,pres, tmp, wd, ws, weatherpublishstarttime,type,pres*sqrt(2)/(287.05 * tmp+273.15) as rho,row_number() over(partition by 1) +12 as dist from external_weather1 where type='GFS') t4

on t1.utctimestamp =t4.utctimestamp


hdfs dfs -cp /user/hive/warehouse/data_o15632609593521.db/kmmlds2/000000_0 /user/hive/warehouse/data_o15632609593521.db/kmmlds1/CGNWF0046/


CREATE external TABLE `kmmlds1`(
`x_basic_time` timestamp,
`x_basic_hour` int,
`x_basic_forecast_time` timestamp,
`x_basic_horizon` int,
`i_set` int,
`ec_nwp_time` timestamp,
`ec_ws` double,
`ec_wd` double,
`ec_tmp` double,
`ec_press` double,
`ec_rho` double,
`ec_dist` int,
`gfs_nwp_time` timestamp,
`gfs_ws` double,
`gfs_wd` double,
`gfs_tmp` double,
`gfs_press` double,
`gfs_rho` double,
`gfs_dist` int,
`speed` double,
`power` double)
partitioned by(masterid string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'/user/hive/warehouse/data_o15632609593521.db/kmmlds1';

alter table kmmlds1 add if not exists partition(masterid='ABCDE0001')
location '/user/hive/warehouse/data_o15632609593521.db/kmmlds1/ABCDE0001';

alter table kmmlds1 add if not exists partition(masterid='CGNWF0046')
location '/user/hive/warehouse/data_o15632609593521.db/kmmlds1/CGNWF0046';

使用外部气象数据

CREATE external TABLE `kmmlds91x`(

`sequence` int,

`i_set` int,

`x_basic_forecast_time` timestamp,

`x_basic_horizon` int,
`x_basic_time` timestamp,
`x_basic_hour` int,
`ec_nwp_time` timestamp,

`ec_dist` int,
`ec_ws` double,
`ec_wd` double,

`ec_rho` double,

`ec_press` double,
`ec_tmp` double,

`gfs_nwp_time` timestamp,

`gfs_dist` int,
`gfs_ws` double,
`gfs_wd` double,

`gfs_rho` double,

`gfs_press` double,
`gfs_tmp` double
)
row format delimited fields terminated by ','
lines terminated by '\n'
location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kmmlds91x'
tblproperties ("skip.header.line.count"="1");


CREATE external TABLE `kmmlds91y`(

`sequence` int,

`speed` double,
`power` double

)
row format delimited fields terminated by ','
lines terminated by '\n'
location 'hdfs://azbeta/user/hive/warehouse/data_o15632609593521.db/kmmlds91y'
tblproperties ("skip.header.line.count"="1");

创建状态表

  1. 使用以下命令,创建状态表,用于记录每天数据的更新情况:

    create table status_tbl(masterid string, updatetime timestamp, flag int);
    
  2. 插入状态标志位:

    insert into status_tbl values('ABCDE0001','2020-06-13',1);
    

整合数据结构说明

整合后的数据的数据类型和描述,如下表所示:

字段名

类型

描述

x_basic_time

timestamp

时间字段,每小时一条数据,世界时时间标签

x_basic_hour

int

小时数,从x_basic_time里获取小时数

x_basic_forecast_time

timestamp

一般是预测第二天零点的值

x_basic_horizon

int

数字编号:0~48,反复

i_set

int

与horizon对应的数字编号,0,1,2….

ec_nwp_time

timestamp

numerical weather prediction,EC气象预报时间,一般每天的12:00预报第二天临时的气象信息

ec-ws

double

风速

ec-wd

double

风向

ec-tmp

double

温度

ec-pres

double

气压

ec-rho

double

空气密度

ec-dist

double

一个计数,没有实际意义

gfs_nwp_time

timestamp

numerical weather prediction,GFS气象预报时间,一般每天的12:00预报第二天临时的气象信息

gfs-ws

double

风速

gfs-wd

double

风向

gfs-tmp

double

温度

gfs-pres

double

气压

gfs-rho

double

空气密度

gfs-dist

double

一个计数,没有实际意义

speed

double

实际风速值

power

double

实际功率值