?
【转载】用 IBM WebSphere DataStage 进行数据整合:第 2 部分
原文地址:http://www.ibm.com/developerworks/cn/data/library/techarticles/dm-0604zhoudp/
?
在本文中,您将看到一个同时处理多个数据源和目标的 ETL Job 的开发过程,并了解 DataStage 中 Container 和 Job Sequence 的用法。
?
?
引言
系列的第一部分介绍了 DataStage 的基本功能。本文将从以下几个方面深入介绍 IBM WebSphere DataStage 在数据整合方面的强大功能。
1. Job Sequence 的用法
2. DataStage Container 的用法
3. 开发一个同时处理多个数据源和目标的 ETL Job
?
Job Sequence 的用法
在用 IBM WebSphere DataStage 进行数据整合的过程中,我们一般会开发很多个单独的 ETL Job 去完成特定的逻辑功能,这些 ETL Job 之间的运行顺序往往是有限制的,那么我们如何处理 ETL Job 之间的这种依赖关系呢?IBM WebSphere DataStage 提供了处理这种问题的方法,那就是使用 Job Sequence。使用 Job Sequence 可以方便的处理 ETL Job 之间的依赖以及运行顺序问题。下面我们通过开发一个简单的 Job Sequence 来演示 Job Sequence 的用法。
开发一个 Job Sequence 和开发一个 ETL Job 的方法是类似的,都是用 DataStage Designer 来开发。我们将要开发的这个 Job Sequence 的功能是实现两个 ETL Job 的顺序执行,并且在第一个 ETL Job 运行成功的情况下第二个 ETL Job 才开始执行。因此我们必须先准备两个 ETL Job,这两个 ETL Job 的名字分别为 Job1 和 Job2。
开发步骤
1. 打开 DataStage Designer。如下图所示,从下拉列表中选择 Job Sequence。这样就会新建一个 Job Sequence;
图 1:新建 Job Sequence
2. 新建的 Job Sequence 如下图所示。这时候这个 Job Sequence 还没有被保存。单击 DataStage Designer 的保存按钮来保存 Job Sequence;
图 2:新建的 Job Sequence
3. 弹出的保存对话框如下图所示,在 Job name 一栏中填入 SampleJobSequence。在 Category 一栏中填入 sequence,单击 OK 按钮保存 Job Sequence;
图 3:保存 Job Sequence
4. 保存好 Job Sequence 后,从左侧面板 Sequence 一栏中的下拉列表中添加两个 Job Activity 到右侧的面板中,并把这两个 Job Activity 连接起来;
图 4:添加 Job Activity
5. 双击左边的 Job Activity Stage,会弹出如下图所示的属性框。在 General 标签中,Name 一栏输入 Job1,这是这个 Job Activity Stage 的名字。然后单击标签 Job;
图 5:输入 Job Activity 的名称
6. 在标签 Job 中单击右侧的按钮来选择这个 Job Activity 连接到哪个 ETL Job 上面;
图 6:选择 ETL Job
7. 在弹出的选择 ETL Job 的对话框中选择 Job1 并单击按钮 OK;
图 7:选择 Job1
8. 在标签 Trigger 中,在 Expression Type 下拉列表中选择 OK-(Conditional)。这个选项的意思是只有在这个 Job Activity Stage 连接的 ETL Job 成功执行后,才能执行后面的 ETL Job。单击按钮 OK 完成对 Job Activity Stage Job1 的配置;
图 8:完成 Job1 的配置
9. 双击右边的 Job Activity Stage,会弹出如下图所示的属性框。在 General 标签中,Name 一栏输入 Job2,这是这个 Job Activity Stage 的名字。然后单击标签 Job;
图 9:建立 Job2
10. 在标签 Job 中,单击右侧按钮,会弹出一个选择 ETL Job 的对话框,在这个对话框中选择 ETL Job Job2。然后单击 OK 按钮;
图 10:选择 Job2
11. 这时你会注意到 ETL Job Job2 已经被添加进来。单击按钮 OK 完成对这个 Job Activity Stage 的配置;
图 11:完成 Job2 的配置
12. 配置完成后,单击图标"编译"来编译该 Job Sequence;
图12:编译 Job Sequence
13. 编译成功后,弹出的对话框中会显示如下图所示的信息。单击按钮 Close;
图 13:编译成功
14. 打开 DataStage Director,选中 SampleJobSequence,然后单击"运行"按钮运行它;
图 14:运行 SampleJobSequence
15. 运行完成后,这个 Job Sequence 的状态会变成 Finished。这说明这个 Job Sequence 中的两个 ETL Job Job1和Job2 都已经成功执行。如果这两个 ETL Job 中的任何一个执行失败的话,那么这个 Job Sequence 的状态就会变成 Aborted。
图 15:执行完毕
?
?
DataStage Container 的用法
DataStage 提供了两种类型的 Container:Local containers 和 Shared containers,下面将分别介绍这两种类型的 Container。
Local containers
1。主要用途
Local containers 主要是用来简化 ETL Job 的设计的,当你的某个 ETL Job 非常复杂的时候,这个 ETL Job 可能会包含非常多的 DataStage 的组件。把这些组件同时在一个面板上显示出来的话就会使整个 ETL Job 的逻辑显得非常混乱,这时候你就可以用一个或者多个 Locale container 把这个 ETL Job 中具有特定功能的逻辑模块(比如数据抽取模块)封装起来。这样就会使这个 ETL Job的流程非常清晰。
2。构造方法
(1) 用 DataStage Designer 打开一个创建好的 ETL Job。然后选择你要封装到 Local container 里面的组件。如下图所示:
图 16:选择组件
(2) 选择 Edit'Construct Container'Local 把选择的 DataStage 组件封装成一个 Local container。
图 17:封装组件
(3) 构建好 Local container 后,原来的 ETL Job 的布局变成如下图所示,这样整个布局看起来比之前要简化多了。你可以通过单击下面的标签 ContainerC3 来查看这个 Local container 里面的内容。
图 18:查看 container 的内容
Shared containers
1. 主要用途
Shared containers 除了具备 Local containers 的功能外,它还可以在不同的 ETL Job 之间实现共享。如果多个 ETL Job 都需要实现某个逻辑功能,那么就可以把这个逻辑功能封装成一个 Shared container。
2. 构造方法
构造 Shared containers 的方法和构造 Local containers 的方法几乎相同,只是 Shared containers 是单独保存起来的。具体构造步骤如下:
(1) 打开一个 ETL Job。
(2) 选择要封装成 Shared containers 的组件。
(3) 选择 Edit'Construct Container'Shared 把选择的 DataStage 组件封装成一个 Shared container。
(4) 保存这个Shared container。
开发一个同时处理多个数据源和目标的 ETL Job
本文的余下部分将开发一个同时处理多个数据源和目标的 ETL Job,使大家对用 IBM WebSphere DataStage 处理多数据源问题有个清楚的认识。这个 ETL Job 的功能是把 DB2 数据库 source 中的 employee 表和 laborcost 表先做 Join,然后再聚合,最后把聚合后的结果存放到两个 DB2 数据库 target1 和 target2 中的 departmentcost 表中去。
这个 ETL Job 所用到的表的结构如下:
表 1:employee 表
?
表 2:laborcost 表
?
表 3:departmentcost 表
开发步骤:
1. 用 DataStage Designer 从数据库中把上述的三张表结构导入到 DataStage Designer 中(如何从数据库中导入表结构请参照本系列第一部分)。存放的路径如下图所示:
图 19:存放路径
2. 用 DataStage Designer 新建一个 Parallel ETL Job,这个 ETL Job 的布局如下图所示:这个 ETL Job 中包含四个 DB2/UDB API Stage,一个 Join Stage,一个 Aggregator Stage 和一个 Copy Stage。DB2/UDB API Stage 负责连接数据表,Join Stage 负责两个数据表之间的连接(根据一个或者多个字段),Aggregator Stage 负责对数据记录进行聚合(作用相当于SQL中的聚合函数),Copy Stage 负责把数据分发到不同的目标表中去;
图 20:ETL Job 布局
3. 双击 DB2/UDB API Stage laborcost,会弹出如下图所示的属性框。在标签 Stage 的子标签 General 中。Server name(也就是要连接的 DB2 数据库的数据库名)一栏输入 source,然后输入用于连接这个数据库的用户名和密码。单击标签 Output;
图 21:Laborcost 的 stage 属性
4. 在 Output 标签的子标签 General 中,设置 Table names 为 laborcost,Query type 设置为 Generated SQL Query。然后单击标签 Columns;
图 22:Laborcost 的 Output 属性
5. 在标签 Output 的子标签 Column 中,单击按钮 Load 导入表结构 laborcost 到 Columns 中,这个表结构描述的是这个 DB2/UDB API Stage 要连接的数据表 laborcost 的表定义。
图 23:导入表结构
6. 单击按钮 View Data 来查看表 laborcost 中的数据,弹出的窗口入下图所示,这个表中共有 4 条数据,单击按钮 Close 关掉数据窗口;
图 24:查看数据
7. 双击 DB2/UDB API Stage employee,会弹出如下图所示的属性框,在 Server name 一栏中输入 source,然后单击标签 Output;
图 25:Employee 的 stage 属性
8. 在标签 Output 的子标签 General 中,Table names 设置为 employee,Query type 选择 Generated SQL query,然后单击标签 Columns;
图 26:Employee 的 output 属性
9. 在标签 Output 的子标签 Column 中,单击按钮 Load 导入表结构 employee 到 Columns 中,这个表结构描述的是这个 DB2/UDB API Stage 要连接的数据表 employee 的表定义。
图 27:查看表定义
10. 单击按钮 View Data 查看表 employee 中的数据,弹出的数据窗口如下图所示,employee 表格中共有三条数据;
图 28:查看表数据
11. 双击 Join Stage,弹出的属性窗口入下图所示,在标签 Stage 的子标签 Properties 中,Join Keys 一栏中选择 Key = employee,Options 一栏里面选择 Join Type = Inner,然后单击标签Output;
图 29:Join Stage
12. 在标签 Output 的子标签 Column 中,单击按钮 Load 导入表结构 departmentcost 到 Columns 中,然后单击 OK 完成对 Join Stage 的属性设置;
图 30:导入表结构
13. 双击 Aggregator Stage,会弹出如下图所示的属性窗口,在标签 Stage 的子标签 Properties 中,Grouping Keys 一栏设置为 Group = departmentid,Aggregation 一栏设置为 Aggregation Type = Caculation,Column for Caculation = laborcost,Sum Out Column = laborcost,然后单击 OK 按钮完成对 Aggregator Stage 的属性设置;
图 31:完成 Aggregator Stage 的设置
14. 双击 DB2/UDB API Stage LaborcostByDepartment1,会弹出如下图所示的属性框,在 Stage 标签的 General 子标签中,Server name 设置为 target1,然后输入用于连接这个数据库的用户名和密码,单击 Input 标签;
图 32:LaborcostByDepartment1 属性
15. 在 Input 标签的 General 子标签中,Table name 设置为 departmentcost,Update action 选择 Clear the table,then insert rows,Create table action 选择 Do not create target table。然后单击 Columns 子标签;
图 33:输入设置
16. 在标签 Input 的 Columns 子标签中,单击 Load 按钮导入表结构 departmentcost 到 Columns 中,然后单击按钮 OK 完成对 DB2/UDB API Stage LaborcostByDepartment1 属性设置;
图 34:导入表结构
17. 以和设置 DB2/UDB API Stage LaborcostByDepartment1 同样的方式设置 DB2/UDB API Stage LaborcostByDepartment2,唯一的不同是 Server name 设置为 target2,其他的属性均设置为相同的;
18. 所有的组件的属性现在都已经设置完毕,现在单击图标"编译"对刚开发好的 ETL Job 进行编译;
图 35:编译 ETL Job
19. 编译成功后,单击图标"运行"来运行 ETL Job;
图 36:运行 ETL Job
20. 等到组件之间的连线变成了绿色,说明 ETL Job 已经成功执行,而且会显示一些统计信息在上面,比如每秒钟处理了多少条数据等;
图 37:运行结果
21. 使用 DataStage Designer 的 View Data 功能查看目标数据库中的数据,如下图所示,目标数据库中的 departmentcost 表中有两条经过聚合后的记录,这也验证了我们开发的 ETL Job 的逻辑是正确的。
图 38:查看目标数据库
总结
本文首先介绍了 IBM WebSphere DataStage 中 Job Sequence 和 Container 的用法,然后用一个 ETL Job 演示了 IBM WebSphere DataStage 处理多数据源方面的优势。使读者对 IBM WebSphere DataStage 有了更进一步的了解。