当前位置: 代码迷 >> 综合 >> AsyncTask -- Java异步调用框架
  详细解决方案

AsyncTask -- Java异步调用框架

热度:11   发布时间:2023-12-08 19:06:09.0

目录(?)[-]

  1. 原创文章转载请注明作者jmppok及出处httpblogcsdnnetjmppokarticledetails44590991
  2. AsyncTask是个人编写的一个Java异步调用框架支持用户
  3. 为什么需要AsyncTask与Asyn4J 区别
    1. 1Java传统的Thread 和 Runable功能不足
    2. 2 Asyn4J
  4. 设计及实现
    1. 1接口设计
    2. 2 代码实现
  5. 测试
    1. 1自定义MyTask
    2. 2自定义MyTaskExecutor
    3. 3配置MyTaskExecutor
    4. 4提交Task并实时监听进度
    5. 5终止Task执行然后重新启动程序进行Task恢复测试
  6. 不足

原创文章,转载请注明作者:jmppok及出处:http://blog.csdn.net/jmppok/article/details/44590991。



AsyncTask是个人编写的一个Java异步调用框架,支持用户:

1)自定义Task,并可设置Task的类型(Type), 子类型(subType),超时时间(TImeout),标识(Flag-可用来区分不同的Task),Task的输入参数(input)等。

2)可通过submitTask,提交 到框架中异步执行,框架查找对应的TaskExectuor,多线程执行。

3)可自定义对应TaskExecutor,通过配置添加到框架中。TaskExecutor支持Execotor Chain, 多个Executor可以组合在一起顺序执行。并且支持在Task执行过程中,实时通知任务调用者Task的状态,进度等。

4)用户可使用TaskCollector通过TaskManager查询所有的Task,支持按Task Id,Task Type, Task SubType, Task State, Task Flag, Task beginTIme, Task finishTime等多种方式的组合查询。

5)支持持久化,用户提交的Task可以被存储在数据库中。即使Task在执行过程中被中断,重新启动后会从数据库中恢复后继续执行。

6)用户可通过查询接口可获取Task的引用ITaskReference,通过ITaskReference可实时获取Task的状态(State)和进度Progress。

7)用户可定义Task的FinishedCallBack回调,在Submit Task时传入,在Task完成后自动回调。

8)通过ITaskReference的waitForTask,支持用户以同步方式使用。

9)用户可通过ITaskReference获取Task的执行结果或错误信息。


代码:https://git.oschina.net/jmpp/AsyncTask


1.为什么需要AsyncTask?与Asyn4J 区别?

1.1Java传统的Thread 和 Runable功能不足

Java提供了Thread,ThreadPool等多线程编程接口。但这些都是基础接口,虽然使用方便,但功能比较简单,很多场景下都无法满足需求。

比如下面的几个场景:

1)我需要提交一个任务,改任务在后台异步执行,同时我要实时观察任务的状态,进度等信息。

2)在提交任务时希望传入参数,任务完成后能主动通知我,并能获取结果。

3)任务持久化,我希望在任务执行完毕后,可以查询到执行的任务列表。或者任务失败后能重新执行。

如果要实现这些场景,Java本身自带的接口显然无法满足,必须要有一个新的框架来实现。


1.2 Asyn4J

Asyn4J也是一个类似的框架,但它目前还不支持任务的超时设置,持久化,任务回调等功能。



2.设计及实现

2.1接口设计

直接上图


2.2 代码实现

具体实现代码见 Git@OSChttps://git.oschina.net/jmpp/AsyncTask

代码结构如下:


这里简单说一下实现思路:

1) 整个实现还是基于Java的Thread和ThreadPool,没有用第三方框架。

2)持久化基于MySQL,只有一个数据库表tasks,见tasks.sql.

3)持久化层的实现用到了Mybatis,给予Mybatis的代码生成工具,直接生成了tasks表对应的数据结构。

4)要持久化必然还要用到对象序列化,这里使用了Kryo。为啥用Kryo,见我的另一篇文章:Java对象序列化小结。

5)日志使用了Log4j。


3.测试

具体可见代码:https://git.oschina.net/jmpp/AsyncTask

3.1自定义MyTask

[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. package test.mytask;  
  2.   
  3. import com.lenovo.asynctask.Task;  
  4.   
  5. /** 
  6.  * 类 MyTask 的实现描述:TODO 类实现描述 
  7.  *  
  8.  * @author ligh4 2015年3月12日下午2:42:56 
  9.  */  
  10. public class MyTask extends Task {  
  11.   
  12.     /** 
  13.      * @param taskType 
  14.      * @param inputParam 
  15.      * @param timeoutMills 
  16.      */  
  17.     public MyTask(Object inputParam, int timeoutMills) {  
  18.         super(MyTask.class.getSimpleName(), inputParam, timeoutMills);  
  19.         setNeedPersistence(true);  
  20.     }  
  21.   
  22. }  


3.2自定义MyTaskExecutor

[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. package test.mytask;  
  2.   
  3. import com.lenovo.asynctask.ITaskExecutor;  
  4. import com.lenovo.asynctask.ITaskReferenceInternal;  
  5. import com.lenovo.asynctask.TaskState;  
  6. import com.lenovo.asynctask.util.LogHelper;  
  7.   
  8. /** 
  9.  * 类 TestTaskExecutor 的实现描述:TODO 类实现描述 
  10.  *  
  11.  * @author ligh4 2015年3月12日下午2:43:19 
  12.  */  
  13. public class MyTaskExecutor extends ITaskExecutor {  
  14.   
  15.     /** 
  16.      * @author ligh4 2015年3月12日下午2:46:51 
  17.      */  
  18.     @Override  
  19.     public Object execute(ITaskReferenceInternal taskRef) {  
  20.         LogHelper.debug("begin execute MyTask...");  
  21.   
  22.         for (int i = 0; i < 100; i++) {  
  23.             try {  
  24.                 Thread.sleep(1000);  
  25.             } catch (Exception e) {  
  26.                 LogHelper.exception(e);  
  27.             }  
  28.             taskRef.setProgress(i + 1);  
  29.         }  
  30.   
  31.         return taskRef.getInput().toString().toUpperCase();  
  32.     }  
  33.   
  34.     /** 
  35.      * @author ligh4 2015年3月12日下午2:46:51 
  36.      */  
  37.     @Override  
  38.     public Object continue_execute(ITaskReferenceInternal taskRef) {  
  39.         if (taskRef.getState() == TaskState.running) {  
  40.             int i = taskRef.getProgress();  
  41.             for (; i < 100; i++) {  
  42.                 try {  
  43.                     Thread.sleep(1000);  
  44.                 } catch (Exception e) {  
  45.                     LogHelper.exception(e);  
  46.                 }  
  47.                 taskRef.setProgress(i + 1);  
  48.             }  
  49.   
  50.             return taskRef.getInput().toString().toUpperCase();  
  51.         } else {  
  52.             taskRef.setState(TaskState.failed, "");  
  53.             return null;  
  54.         }  
  55.   
  56.     }  
  57.   
  58. }  


3.3配置MyTaskExecutor

taskexecutors.properties中添加:

[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. MyTask = test.mytask.MyTaskExecutor  

其实是task的type     =     task的Executor     

3.4提交Task并实时监听进度

[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. package test.mytask;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.lenovo.asynctask.ITaskFinishedCallback;  
  6. import com.lenovo.asynctask.ITaskReference;  
  7. import com.lenovo.asynctask.TaskCollector;  
  8. import com.lenovo.asynctask.TaskManager;  
  9. import com.lenovo.asynctask.TaskState;  
  10. import com.lenovo.asynctask.util.DateUtil;  
  11. import com.lenovo.asynctask.util.LogHelper;  
  12.   
  13. /** 
  14.  * 类 TestContinueTask 的实现描述:TODO 类实现描述 
  15.  *  
  16.  * @author ligh4 2015年3月23日上午9:42:14 
  17.  */  
  18. public class TestContinueTask {  
  19.   
  20.     /** 
  21.      * @author ligh4 2015年3月12日下午2:52:45 
  22.      * @param args 
  23.      */  
  24.     public static void main(String[] args) throws Exception {  
  25.         TaskManager.instance().start();  
  26.   
  27.         List<ITaskReference> tasks = queryRunningTasks();  
  28.         if (tasks == null || tasks.size() == 0) {  
  29.             submitAndWaitTask();  
  30.         } else {  
  31.             for (ITaskReference taskReference : tasks) {  
  32.                 queryTaskProgress(taskReference);  
  33.             }  
  34.         }  
  35.   
  36.         TaskManager.instance().stop();  
  37.     }  
  38.   
  39.     public static void submitAndWaitTask() throws Exception {  
  40.         MyTask task = new MyTask("liguanghui"200000);  
  41.         ITaskReference taskReference = TaskManager.instance().submitTask(task,  
  42.                 new ITaskFinishedCallback() {  
  43.   
  44.                     @Override  
  45.                     public void onTaskFinished(ITaskReference taskRef) {  
  46.                         LogHelper.debug(taskRef.getId() + ";" + taskRef.getState().toString() + ";"  
  47.                                 + DateUtil.format(taskRef.getStartedTime()) + "  "  
  48.                                 + DateUtil.format(taskRef.getFinishedTime()) + ";"  
  49.                                 + taskRef.getResult().toString());  
  50.   
  51.                     }  
  52.                 });  
  53.   
  54.         queryTaskProgress(taskReference);  
  55.     }  
  56.   
  57.     public static void queryTaskProgress(ITaskReference taskReference) throws Exception {  
  58.         String taskID = taskReference.getId();  
  59.         while (!taskReference.isFinished()) {  
  60.             LogHelper.debug(taskID + ": progress " + taskReference.getProgress());  
  61.             Thread.sleep(1000);  
  62.         }  
  63.         LogHelper.debug(taskID + ": finished. ");  
  64.     }  
  65.   
  66.     public static List<ITaskReference> queryRunningTasks() {  
  67.         TaskCollector collector = new TaskCollector();  
  68.         collector.setTaskStateFilter(new TaskState[] { TaskState.running });  
  69.         collector.setTaskTypeFilter(new String[] { MyTask.class.getSimpleName() });  
  70.         return TaskManager.instance().findTasks(collector);  
  71.   
  72.     }  
  73. }  


3.5终止Task执行然后重新启动程序,进行Task恢复测试

还是同3.4一样的代码

1)第一次运行没有Task,会提交一个Task。  submitAndWaitTask();

2)如果改Task没有被执行完毕就被终止,第二次启动后该Task就会恢复。

3)这时queryRunningTasks就会查询到正在运行的Task,并且进度到等待该Task的分支。

4)当然如果你停止的时间很长才重新启动,会发现Task超时。

4.不足

1.整体实现比较简单,特别是数据库表中存储Java对象序列化的字段,偷懒用的varchar(2000),可能超出,最好改为Blob。(为啥当时不用Blob,因为偷懒,如果Blob的话mybatis生成的代码就比较复杂了,会有一个XXwithBlob,调用不方便....)

2.线程池个数写死了,应该可以配置。

3.测试比较简单,可能有未知bug。



  相关解决方案