Last modified: August 08, 2018
This article is written in: 🇵🇱
Procesy to samodzielne jednostki wykonywane w systemie operacyjnym, każda z własną przestrzenią adresową i zasobami. Każdy proces działa niezależnie i jest izolowany od innych procesów. W związku z tym, komunikacja między procesami wymaga specjalnych mechanizmów, takich jak kolejki czy potoki. Procesy są z reguły cięższe niż wątki pod względem zużycia zasobów, ale mają zaletę lepszej izolacji, co sprawia, że są bardziej odporne na błędy i interferencje.
W Pythonie, dzięki modułowi multiprocessing, można łatwo tworzyć i zarządzać procesami:
import multiprocessing
import time
def pracownik():
print("Rozpoczynam pracę")
time.sleep(2)
print("Kończę pracę")
proces = multiprocessing.Process(target=pracownik)
proces.start()
proces.join() # Oczekuje na zakończenie procesu
Aby zatrzymać proces przed jego naturalnym zakończeniem, można użyć metody terminate():
proces.terminate()
Jednak korzystanie z terminate() powinno być ostrożne, ponieważ może prowadzić do nieprzewidywalnych skutków, takich jak niedokończone operacje czy utrata danych.
Jak wcześniej wspomniano, jednym z wyzwań związanych z procesami jest ich izolacja, co oznacza, że nie mogą one bezpośrednio dzielić się swoim stanem ani zasobami. W związku z tym konieczne jest korzystanie z mechanizmów IPC, aby umożliwić procesom współpracę.
Kolejki w module multiprocessing działają podobnie jak wątkowe kolejki w module queue. Pozwalają one na przesyłanie i odbieranie komunikatów między procesami:
import multiprocessing
def pracownik(kolejka):
kolejka.put("Proces pozdrawia serdecznie!")
if __name__ == "__main__":
kolejka = multiprocessing.Queue()
proces = multiprocessing.Process(target=pracownik, args=(kolejka,))
proces.start()
print(kolejka.get()) # Odbieramy komunikat od procesu
proces.join()
W tym przykładzie proces potomny wysyła komunikat do procesu głównego za pomocą kolejki.
Potoki to kolejny sposób komunikacji między procesami. Składają się z dwóch połączonych końcówek: jednej do wysyłania, a drugiej do odbierania danych.
import multiprocessing
def pracownik(polaczenie):
polaczenie.send("Proces pozdrawia serdecznie!")
polaczenie.close()
if __name__ == "__main__":
polaczenie_rodzica, polaczenie_dziecka = multiprocessing.Pipe()
proces = multiprocessing.Process(target=pracownik, args=(polaczenie_dziecka,))
proces.start()
print(polaczenie_rodzica.recv()) # Odbieramy komunikat od procesu
proces.join()
Dla bardziej złożonych struktur danych, takich jak listy czy słowniki, można użyć multiprocessing.Manager, który pozwala na udostępnianie tych struktur między procesami.
import multiprocessing
def pracownik(lista, slownik):
lista.append("Proces dodał element do listy")
slownik["klucz"] = "Wartość dodana przez proces"
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
lista = manager.list()
slownik = manager.dict()
proces = multiprocessing.Process(target=pracownik, args=(lista, slownik))
proces.start()
proces.join()
print(lista)
print(slownik)
W powyższym przykładzie proces potomny dodaje elementy do współdzielonej listy i słownika.
Podczas korzystania z mechanizmów IPC należy pamiętać o możliwych zakleszczeniach (deadlocks). Ważne jest, aby:
get() czy recv() można ustawiać timeouty, aby uniknąć nieskończonego oczekiwania.
import multiprocessing
import queue
def pracownik(kolejka):
try:
kolejka.put("Proces pozdrawia serdecznie!", timeout=1)
except queue.Full:
print("Kolejka jest pełna")
if __name__ == "__main__":
kolejka = multiprocessing.Queue(maxsize=1)
proces = multiprocessing.Process(target=pracownik, args=(kolejka,))
proces.start()
try:
print(kolejka.get(timeout=1)) # Odbieramy komunikat od procesu
except queue.Empty:
print("Kolejka jest pusta")
proces.join()
import multiprocessing
# Funkcja obliczająca kwadraty liczb
def oblicz_kwadraty(liczby, wynik, indeks):
print(f"Proces {indeks + 1} pracuje z {liczby}")
for i, liczba w enumerate(liczby):
wynik[i] = liczba * liczba
print(f"Proces {indeks + 1} zakończony!")
# Główna część programu
if __name__ == "__main__":
liczby = [1, 2, 3, 4, 5, 6, 7, 8, 9]
liczba_procesow = 3
rozmiar = len(liczby) // liczba_procesow
procesy = []
wynik = multiprocessing.Array('i', len(liczby))
# Tworzenie i uruchamianie procesów
for i in range(liczba_procesow):
indeks_startowy = rozmiar * i
indeks_koncowy = len(liczby) if i == liczba_procesow - 1 else indeks_startowy + rozmiar
p = multiprocessing.Process(target=oblicz_kwadraty, args=(liczby[indeks_startowy:indeks_koncowy], wynik[indeks_startowy:indeks_koncowy], i))
procesy.append(p)
p.start()
# Czekanie na zakończenie wszystkich procesów
for p in procesy:
p.join()
# Wyświetlenie wyników
print(f"Kwadraty: {list(wynik)}")
W tym kodzie dzielimy listę liczb na mniejsze fragmenty i przekazujemy je do różnych procesów. Każdy proces oblicza kwadraty liczb równolegle z innymi procesami. Używamy wspólnej pamięci (w postaci multiprocessing.Array) do przechowywania wyników, więc po zakończeniu wszystkich procesów możemy odczytać wyniki.
Kluczowe aspekty:
multiprocessing.Array umożliwia współdzielenie wyników między procesami. Jest to wspólna pamięć, w której procesy zapisują wyniki swoich obliczeń.W bardziej zaawansowanych scenariuszach warto rozważyć dynamiczne przypisywanie zadań procesom za pomocą kolejek. Poniższy przykład ilustruje, jak to zrobić:
import multiprocessing
def oblicz_kwadraty_task(kolejka, wynik):
while True:
indeks, liczba = kolejka.get()
if liczba is None:
break
wynik[indeks] = liczba * liczba
if __name__ == "__main__":
liczby = [1, 2, 3, 4, 5, 6, 7, 8, 9]
liczba_procesow = 3
kolejka = multiprocessing.Queue()
wynik = multiprocessing.Array('i', len(liczby))
procesy = []
# Tworzenie procesów
for _ in range(liczba_procesow):
p = multiprocessing.Process(target=oblicz_kwadraty_task, args=(kolejka, wynik))
procesy.append(p)
p.start()
# Wysyłanie zadań do kolejki
for i, liczba w enumerate(liczby):
kolejka.put((i, liczba))
# Wysyłanie sygnału zakończenia do procesów
for _ in range(liczba_procesow):
kolejka.put((None, None))
# Czekanie na zakończenie wszystkich procesów
for p in procesy:
p.join()
# Wyświetlenie wyników
print(f"Kwadraty: {list(wynik)}")
W powyższym przykładzie dynamicznie przypisujemy zadania procesom za pomocą kolejki. Jest to bardziej elastyczne podejście, które pozwala lepiej wykorzystać zasoby w przypadku, gdy liczba zadań jest zmienna lub gdy zadania mają różny czas wykonania.