Python建立循环任务

Python内置sched模块,sched中的scheduler类可以计划一个一次性任务,但并没有提供循环任务的方法。如果只使用Python的标准模块建立循环任务,下面提供一个简单的例子,希望能抛砖引玉。

import time
import sched
import threading

class loop:
    def __init__(self,starttime,delay,action,*args):
        self.run(starttime,delay,action,*args)
        
    def run(self,starttime,delay,action,*args):
        self._action=action
        self._args=args
        self._daemon=threading.Thread(target=self._thread,args=(starttime,delay))
        self._daemon.start()
        
    def end(self):
        self._s.cancel(self._s.queue[0])
        
    def _sche(self,delay):
        self._s.enter(delay,1,self._sche,argument=(delay,))
        self._s.run(False)
        self._action(*self._args) if self._args is not None else self._action()

    def _thread(self,starttime,delay):
        self._s.enter(starttime,1,self._sche,argument=(delay,))
        self._s.run()
    
    _action=None
    _args=None
    _s = sched.scheduler(time.time, time.sleep)
    _daemon=None

run方法或者__init__方法可以计划一个循环任务,end方法理应可以结束任务。

留下评论