실타래의타이머 - 'n'초마다 기능 반복
0.5초마다 기능을 끄고 타이머를 시작하고 중지하고 재설정할 수 있습니다.저는 파이썬 스레드가 어떻게 작동하는지 잘 모르고 파이썬 타이머에 어려움을 겪고 있습니다.
하지만, 나는 계속해서.RuntimeError: threads can only be started once
실행할 때threading.timer.start()
두 번입니다. 이것에 대한 해결책이 있나요?신청해봤습니다.threading.timer.cancel()
시작할 때마다
유사 코드:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
가장 좋은 방법은 타이머 스레드를 한 번 시작하는 것입니다.타이머 스레드 안에서 다음을 코딩합니다.
class MyThread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event
def run(self):
while not self.stopped.wait(0.5):
print("my thread")
# call a function
타이머를 시작한 코드에서 다음을 수행할 수 있습니다.set
타이머를 중지하기 위해 중지된 이벤트.
stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
한스의 대답에 조금 더 나아진다면, 우리는 타이머 기능을 하위 분류하면 됩니다.다음은 우리의 전체 "반복 타이머" 코드가 되며, 스레드화를 위한 드롭인 대체물로 사용될 수 있습니다.모든 동일한 인수를 사용하는 타이머:
from threading import Timer
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
사용 예:
def dummyfn(msg="foo"):
print(msg)
timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()
는 다음 출력을 생성합니다.
foo
foo
foo
foo
그리고.
timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()
생산물
bar
bar
bar
bar
import threading
def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)
t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator
용도:
@setInterval(.5)
def function():
"..."
stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()
또는 같은 기능이지만 장식기 대신 독립형 기능으로 사용할 수 있습니다.
cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()
스레드를 사용하지 않고 수행하는 방법은 다음과 같습니다.
타이머 스레드 사용
from threading import Timer,Thread,Event
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
print 'ipsem lorem'
t = perpetualTimer(5,printer)
t.start()
이것은 에 의해 멈출 수 있습니다.t.cancel()
OP가 요청한 대로 타이머를 사용하여 정답을 제공하기 위해 swapnil jariwala의 답변을 개선하겠습니다.
from threading import Timer
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None
def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick():
print('ipsem lorem')
# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
작은 콘솔 시계를 만들기 위해 swapnil-jariwala 코드의 일부 코드를 변경했습니다.
from threading import Timer, Thread, Event
from datetime import datetime
class PT():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def printer():
tempo = datetime.today()
h,m,s = tempo.hour, tempo.minute, tempo.second
print(f"{h}:{m}:{s}")
t = PT(1, printer)
t.start()
산출량
>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...
Tkinter 그래픽 인터페이스가 있는 타이머
이 코드는 클럭 타이머를 Tinker가 있는 작은 창에 넣습니다.
from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk
app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
tempo = datetime.today()
clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
try:
lab['text'] = clock
except RuntimeError:
exit()
t = perpetualTimer(1, printer)
t.start()
app.mainloop()
플래시 카드 게임의 예(종류)
from threading import Timer, Thread, Event
from datetime import datetime
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
x = datetime.today()
start = x.second
def printer():
global questions, counter, start
x = datetime.today()
tempo = x.second
if tempo - 3 > start:
show_ans()
#print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
print()
print("-" + questions[counter])
counter += 1
if counter == len(answers):
counter = 0
def show_ans():
global answers, c2
print("It is {}".format(answers[c2]))
c2 += 1
if c2 == len(answers):
c2 = 0
questions = ["What is the capital of Italy?",
"What is the capital of France?",
"What is the capital of England?",
"What is the capital of Spain?"]
answers = "Rome", "Paris", "London", "Madrid"
counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()
출력:
Get ready to answer
>>>
-What is the capital of Italy?
It is Rome
-What is the capital of France?
It is Paris
-What is the capital of England?
...
저는 프로젝트를 위해 이것을 해야만 했습니다.제가 하게 된 것은 기능을 위한 별도의 스레드를 시작하는 것이었습니다.
t = threading.Thread(target =heartbeat, args=(worker,))
t.start()
****는 제 기능이고, 근로자는 제 주장 중 하나입니다.***
내 심장 박동 기능 내부:
def heartbeat(worker):
while True:
time.sleep(5)
#all of my code
따라서 스레드를 시작하면 함수가 5초 동안 반복적으로 대기하고 모든 코드를 실행한 후 무한정 실행합니다.프로세스를 종료하려면 스레드를 종료하십시오.
스레드를 사용한 위의 훌륭한 답변 외에도 메인 스레드를 사용하거나 비동기 접근 방식을 선호하는 경우 - aio_timers Timer 클래스를 중심으로 짧은 클래스를 감았습니다(반복을 활성화하기 위해).
import asyncio
from aio_timers import Timer
class RepeatingAsyncTimer():
def __init__(self, interval, cb, *args, **kwargs):
self.interval = interval
self.cb = cb
self.args = args
self.kwargs = kwargs
self.aio_timer = None
self.start_timer()
def start_timer(self):
self.aio_timer = Timer(delay=self.interval,
callback=self.cb_wrapper,
callback_args=self.args,
callback_kwargs=self.kwargs
)
def cb_wrapper(self, *args, **kwargs):
self.cb(*args, **kwargs)
self.start_timer()
from time import time
def cb(timer_name):
print(timer_name, time())
print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
시계 시작 시간: 1602438840.9690785
타이머_1602438845.980087
타이머_2 1602438850.9806316
타이머_1602438850.9808934
타이머_1602438855.9863033
타이머_2 1602438860.9868324
타이머_1602438860.9876585
저는 타이머로 작동하는 수업을 구현했습니다.
필요한 사람이 있을 경우를 대비해 링크를 여기에 남깁니다: https://github.com/ivanhalencp/python/tree/master/xTimer
from threading import Timer
def TaskManager():
#do stuff
t = Timer( 1, TaskManager )
t.start()
TaskManager()
여기 작은 샘플이 있습니다. 작동 방식을 더 잘 이해하는 데 도움이 될 것입니다.function taskManager()는 자신에게 지연된 함수 호출을 생성합니다.
"dalay" 변수를 변경하려고 하면 차이를 볼 수 있습니다.
from threading import Timer, _sleep
# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True
def taskManager():
global counter, DATA, delay, allow_run
counter += 1
if len(DATA) > 0:
if FIFO:
print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
else:
print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")
else:
print("["+str(counter)+"] no data")
if allow_run:
#delayed method/function call to it self
t = Timer( dalay, taskManager )
t.start()
else:
print(" END task-manager: disabled")
# ------------------------------------------
def main():
DATA.append("data from main(): 0")
_sleep(2)
DATA.append("data from main(): 1")
_sleep(2)
# ------------------------------------------
print(" START task-manager:")
taskManager()
_sleep(2)
DATA.append("first data")
_sleep(2)
DATA.append("second data")
print(" START main():")
main()
print(" END main():")
_sleep(2)
DATA.append("last data")
allow_run = False
나는 특히 스레드를 해체하고 타이머가 똑딱거릴 때마다 새 스레드를 만들 필요가 없다는 점에서 오른쪽 클릭의 답변을 좋아합니다.또한 정기적으로 호출되는 타이머 콜백으로 클래스를 만드는 것은 쉬운 재정의입니다.이것이 제 일반적인 사용 사례입니다.
class MyClass(RepeatTimer):
def __init__(self, period):
super().__init__(period, self.on_timer)
def on_timer(self):
print("Tick")
if __name__ == "__main__":
mc = MyClass(1)
mc.start()
time.sleep(5)
mc.cancel()
이것은 클래스 대신 함수를 사용하는 대체 구현입니다.위의 @Andrew Wilkins에서 영감을 받았습니다.
대기가 수면보다 더 정확하기 때문에(함수 런타임을 고려합니다):
import threading
PING_ON = threading.Event()
def ping():
while not PING_ON.wait(1):
print("my thread %s" % str(threading.current_thread().ident))
t = threading.Thread(target=ping)
t.start()
sleep(5)
PING_ON.set()
저는 싱글톤 클래스와 함께 다른 해결책을 생각해냈습니다.메모리 누수가 있으면 알려주시기 바랍니다.
import time,threading
class Singleton:
__instance = None
sleepTime = 1
executeThread = False
def __init__(self):
if Singleton.__instance != None:
raise Exception("This class is a singleton!")
else:
Singleton.__instance = self
@staticmethod
def getInstance():
if Singleton.__instance == None:
Singleton()
return Singleton.__instance
def startThread(self):
self.executeThread = True
self.threadNew = threading.Thread(target=self.foo_target)
self.threadNew.start()
print('doing other things...')
def stopThread(self):
print("Killing Thread ")
self.executeThread = False
self.threadNew.join()
print(self.threadNew)
def foo(self):
print("Hello in " + str(self.sleepTime) + " seconds")
def foo_target(self):
while self.executeThread:
self.foo()
print(self.threadNew)
time.sleep(self.sleepTime)
if not self.executeThread:
break
sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()
sClass.getInstance().sleepTime = 2
sClass.startThread()
제가 좀 늦었지만 여기 제 2센트가 있습니다.
재사용할 수 .threading.Timer
을 제다하기라고 부르며 합니다..run()
방법은 반복적으로, 다음과 같습니다.
class SomeClassThatNeedsATimer:
def __init__(...):
self.timer = threading.Timer(interval, self.on_timer)
self.timer.start()
def on_timer(self):
print('On timer')
self.timer.run()
OP가 요청한 대로 타이머를 사용하여 정답을 제공하기 위해 https://stackoverflow.com/a/41450617/20750754 을 개선하겠습니다.
from threading import Timer
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target, args=[], kwargs=dict()):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.args = args
self.kwargs = kwargs
self.thread = None
def _handle_target(self):
self.is_running = True
self.target(*self.args, **self.kwargs)
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(
self.seconds,
self._handle_target,
)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print(
"Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick(i):
print('ipsem lorem', i)
# Example Usage
t = InfiniteTimer(0.5, tick, kwargs=dict(i="i"))
t.start()
이것은 타이머를 연속적으로 실행하기 위한 샘플 코드입니다.소진 시 새로운 타이머를 생성하고 동일한 기능을 호출하기만 하면 됩니다.최선의 방법은 아니지만 이렇게 할 수도 있습니다.
import threading
import time
class ContinousTimer():
def __init__(self):
self.timer = None
def run(self, msg='abc'):
print(msg)
self.timer = threading.Timer(interval=2, function=self.run, args=(msg, ))
self.timer.start()
if __name__ == "__main__":
t = ContinousTimer()
try:
t.run(msg="Hello")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
# Cancel Timer
t.timer.cancel()
언급URL : https://stackoverflow.com/questions/12435211/threading-timer-repeat-function-every-n-seconds
'programing' 카테고리의 다른 글
개별 클래스 이름에 '시작' 선택기 사용 (0) | 2023.07.27 |
---|---|
Oracle은 공백 앞에 하위 문자열을 가져옵니다. (0) | 2023.07.27 |
Python 피클의 기본(또는 가장 일반적인) 파일 확장자 (0) | 2023.07.27 |
툴 체인을 사용하는 'Java SE 11' 플랫폼을 대상으로 할 수 없음: 'JDK 8 (1.8)' (0) | 2023.07.27 |
Spring Boot 시작 시 데이터베이스에 샘플 데이터 삽입 (0) | 2023.07.22 |