programing

실타래의타이머 - 'n'초마다 기능 반복

padding 2023. 7. 27. 21:39
반응형

실타래의타이머 - 'n'초마다 기능 반복

0.5초마다 기능을 끄고 타이머를 시작하고 중지하고 재설정할 수 있습니다.저는 파이썬 스레드가 어떻게 작동하는지 잘 모르고 파이썬 타이머에 어려움을 겪고 있습니다.

하지만, 나는 계속해서.RuntimeError: threads can only be started once실행할 때threading.timer.start()두 번입니다. 이것에 대한 해결책이 있나요?신청해봤습니다.threading.timer.cancel()시작할 때마다

유사 코드:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()

가장 좋은 방법은 타이머 스레드를 한 번 시작하는 것입니다.타이머 스레드 안에서 다음을 코딩합니다.

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

타이머를 시작한 코드에서 다음을 수행할 수 있습니다.set타이머를 중지하기 위해 중지된 이벤트.

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()

한스의 대답에 조금 더 나아진다면, 우리는 타이머 기능을 하위 분류하면 됩니다.다음은 우리의 전체 "반복 타이머" 코드가 되며, 스레드화를 위한 드롭인 대체물로 사용될 수 있습니다.모든 동일한 인수를 사용하는 타이머:

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

사용 예:

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

는 다음 출력을 생성합니다.

foo
foo
foo
foo

그리고.

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

생산물

bar
bar
bar
bar

python의 setInterval에 해당하는 출처:

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

용도:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set() 

또는 같은 기능이지만 장식기 대신 독립형 기능으로 사용할 수 있습니다.

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls() 

스레드를 사용하지 않고 수행하는 방법은 다음과 같습니다.

타이머 스레드 사용

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

이것은 에 의해 멈출 수 있습니다.t.cancel()

OP가 요청한 대로 타이머를 사용하여 정답을 제공하기 위해 swapnil jariwala의 답변을 개선하겠습니다.

from threading import Timer


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()

작은 콘솔 시계를 만들기 위해 swapnil-jariwala 코드의 일부 코드를 변경했습니다.

from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

산출량

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

Tkinter 그래픽 인터페이스가 있는 타이머

이 코드는 클럭 타이머를 Tinker가 있는 작은 창에 넣습니다.

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

플래시 카드 게임의 예(종류)

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

출력:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...

저는 프로젝트를 위해 이것을 해야만 했습니다.제가 하게 된 것은 기능을 위한 별도의 스레드를 시작하는 것이었습니다.

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

****는 제 기능이고, 근로자는 제 주장 중 하나입니다.***

내 심장 박동 기능 내부:

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

따라서 스레드를 시작하면 함수가 5초 동안 반복적으로 대기하고 모든 코드를 실행한 후 무한정 실행합니다.프로세스를 종료하려면 스레드를 종료하십시오.

스레드를 사용한 위의 훌륭한 답변 외에도 메인 스레드를 사용하거나 비동기 접근 방식을 선호하는 경우 - aio_timers Timer 클래스를 중심으로 짧은 클래스를 감았습니다(반복을 활성화하기 위해).

import asyncio
from aio_timers import Timer

class RepeatingAsyncTimer():
    def __init__(self, interval, cb, *args, **kwargs):
        self.interval = interval
        self.cb = cb
        self.args = args
        self.kwargs = kwargs
        self.aio_timer = None
        self.start_timer()
    
    def start_timer(self):
        self.aio_timer = Timer(delay=self.interval, 
                               callback=self.cb_wrapper, 
                               callback_args=self.args, 
                               callback_kwargs=self.kwargs
                              )
    
    def cb_wrapper(self, *args, **kwargs):
        self.cb(*args, **kwargs)
        self.start_timer()


from time import time
def cb(timer_name):
    print(timer_name, time())

print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')

시계 시작 시간: 1602438840.9690785

타이머_1602438845.980087

타이머_2 1602438850.9806316

타이머_1602438850.9808934

타이머_1602438855.9863033

타이머_2 1602438860.9868324

타이머_1602438860.9876585

저는 타이머로 작동하는 수업을 구현했습니다.

필요한 사람이 있을 경우를 대비해 링크를 여기에 남깁니다: https://github.com/ivanhalencp/python/tree/master/xTimer

from threading import Timer
def TaskManager():
    #do stuff
    t = Timer( 1, TaskManager )
    t.start()

TaskManager()

여기 작은 샘플이 있습니다. 작동 방식을 더 잘 이해하는 데 도움이 될 것입니다.function taskManager()는 자신에게 지연된 함수 호출을 생성합니다.

"dalay" 변수를 변경하려고 하면 차이를 볼 수 있습니다.

from threading import Timer, _sleep

# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True

def taskManager():

    global counter, DATA, delay, allow_run
    counter += 1

    if len(DATA) > 0:
        if FIFO:
            print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
        else:
            print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")

    else:
        print("["+str(counter)+"] no data")

    if allow_run:
        #delayed method/function call to it self
        t = Timer( dalay, taskManager )
        t.start()

    else:
        print(" END task-manager: disabled")

# ------------------------------------------
def main():

    DATA.append("data from main(): 0")
    _sleep(2)
    DATA.append("data from main(): 1")
    _sleep(2)


# ------------------------------------------
print(" START task-manager:")
taskManager()

_sleep(2)
DATA.append("first data")

_sleep(2)
DATA.append("second data")

print(" START main():")
main()
print(" END main():")

_sleep(2)
DATA.append("last data")

allow_run = False

나는 특히 스레드를 해체하고 타이머가 똑딱거릴 때마다 새 스레드를 만들 필요가 없다는 점에서 오른쪽 클릭의 답변을 좋아합니다.또한 정기적으로 호출되는 타이머 콜백으로 클래스를 만드는 것은 쉬운 재정의입니다.이것이 제 일반적인 사용 사례입니다.

class MyClass(RepeatTimer):
    def __init__(self, period):
        super().__init__(period, self.on_timer)

    def on_timer(self):
        print("Tick")


if __name__ == "__main__":
    mc = MyClass(1)
    mc.start()
    time.sleep(5)
    mc.cancel()

이것은 클래스 대신 함수를 사용하는 대체 구현입니다.위의 @Andrew Wilkins에서 영감을 받았습니다.

대기가 수면보다 더 정확하기 때문에(함수 런타임을 고려합니다):

import threading

PING_ON = threading.Event()

def ping():
  while not PING_ON.wait(1):
    print("my thread %s" % str(threading.current_thread().ident))

t = threading.Thread(target=ping)
t.start()

sleep(5)
PING_ON.set()

저는 싱글톤 클래스와 함께 다른 해결책을 생각해냈습니다.메모리 누수가 있으면 알려주시기 바랍니다.

import time,threading

class Singleton:
  __instance = None
  sleepTime = 1
  executeThread = False

  def __init__(self):
     if Singleton.__instance != None:
        raise Exception("This class is a singleton!")
     else:
        Singleton.__instance = self

  @staticmethod
  def getInstance():
     if Singleton.__instance == None:
        Singleton()
     return Singleton.__instance


  def startThread(self):
     self.executeThread = True
     self.threadNew = threading.Thread(target=self.foo_target)
     self.threadNew.start()
     print('doing other things...')


  def stopThread(self):
     print("Killing Thread ")
     self.executeThread = False
     self.threadNew.join()
     print(self.threadNew)


  def foo(self):
     print("Hello in " + str(self.sleepTime) + " seconds")


  def foo_target(self):
     while self.executeThread:
        self.foo()
        print(self.threadNew)
        time.sleep(self.sleepTime)

        if not self.executeThread:
           break


sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()

sClass.getInstance().sleepTime = 2
sClass.startThread()

제가 좀 늦었지만 여기 제 2센트가 있습니다.

재사용할 수 .threading.Timer을 제다하기라고 부르며 합니다..run()방법은 반복적으로, 다음과 같습니다.

class SomeClassThatNeedsATimer:
    def __init__(...):
        self.timer = threading.Timer(interval, self.on_timer)
        self.timer.start()

    def on_timer(self):
        print('On timer')
        self.timer.run()

OP가 요청한 대로 타이머를 사용하여 정답을 제공하기 위해 https://stackoverflow.com/a/41450617/20750754 을 개선하겠습니다.

from threading import Timer


class InfiniteTimer():
  """A Timer class that does not stop, unless you want it to."""

  def __init__(self, seconds, target, args=[], kwargs=dict()):
    self._should_continue = False
    self.is_running = False
    self.seconds = seconds
    self.target = target
    self.args = args
    self.kwargs = kwargs
    self.thread = None

  def _handle_target(self):
    self.is_running = True
    self.target(*self.args, **self.kwargs)
    self.is_running = False
    self._start_timer()

  def _start_timer(self):
    if self._should_continue:  # Code could have been running when cancel was called.
      self.thread = Timer(
        self.seconds,
        self._handle_target,
      )
      self.thread.start()

  def start(self):
    if not self._should_continue and not self.is_running:
      self._should_continue = True
      self._start_timer()
    else:
      print(
        "Timer already started or running, please wait if you're restarting.")

  def cancel(self):
    if self.thread is not None:
      self._should_continue = False  # Just in case thread is running and cancel fails.
      self.thread.cancel()
    else:
      print("Timer never started or failed to initialize.")


def tick(i):
  print('ipsem lorem', i)


# Example Usage
t = InfiniteTimer(0.5, tick, kwargs=dict(i="i"))
t.start()

이것은 타이머를 연속적으로 실행하기 위한 샘플 코드입니다.소진 시 새로운 타이머를 생성하고 동일한 기능을 호출하기만 하면 됩니다.최선의 방법은 아니지만 이렇게 할 수도 있습니다.

import threading
import time


class ContinousTimer():
    def __init__(self):
        self.timer = None

    def run(self, msg='abc'):
        print(msg)

        self.timer = threading.Timer(interval=2, function=self.run, args=(msg, ))
        self.timer.start()


if __name__ == "__main__":
    t = ContinousTimer()
    try:
        t.run(msg="Hello")
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        # Cancel Timer
        t.timer.cancel()

언급URL : https://stackoverflow.com/questions/12435211/threading-timer-repeat-function-every-n-seconds

반응형