Last modified: August 08, 2018
This article is written in: 🇵🇱
Procesy to samodzielne jednostki wykonywane w systemie operacyjnym, każda z własną przestrzenią adresową i zasobami. Każdy proces działa niezależnie i jest izolowany od innych procesów. W związku z tym, komunikacja między procesami wymaga specjalnych mechanizmów, takich jak kolejki czy potoki. Procesy są z reguły cięższe niż wątki pod względem zużycia zasobów, ale mają zaletę lepszej izolacji, co sprawia, że są bardziej odporne na błędy i interferencje.
W Pythonie, dzięki modułowi multiprocessing, można łatwo tworzyć i zarządzać procesami:
import multiprocessing
import time
def pracownik():
    print("Rozpoczynam pracę")
    time.sleep(2)
    print("Kończę pracę")
proces = multiprocessing.Process(target=pracownik)
proces.start()
proces.join()  # Oczekuje na zakończenie procesuAby zatrzymać proces przed jego naturalnym zakończeniem, można użyć metody terminate():
proces.terminate()Jednak korzystanie z terminate() powinno być ostrożne, ponieważ może prowadzić do nieprzewidywalnych skutków, takich jak niedokończone operacje czy utrata danych.
Jak wcześniej wspomniano, jednym z wyzwań związanych z procesami jest ich izolacja, co oznacza, że nie mogą one bezpośrednio dzielić się swoim stanem ani zasobami. W związku z tym konieczne jest korzystanie z mechanizmów IPC, aby umożliwić procesom współpracę.
Kolejki w module multiprocessing działają podobnie jak wątkowe kolejki w module queue. Pozwalają one na przesyłanie i odbieranie komunikatów między procesami:
import multiprocessing
def pracownik(kolejka):
    kolejka.put("Proces pozdrawia serdecznie!")
if __name__ == "__main__":
    kolejka = multiprocessing.Queue()
    proces = multiprocessing.Process(target=pracownik, args=(kolejka,))
    proces.start()
    print(kolejka.get())  # Odbieramy komunikat od procesu
    proces.join()W tym przykładzie proces potomny wysyła komunikat do procesu głównego za pomocą kolejki.
Potoki to kolejny sposób komunikacji między procesami. Składają się z dwóch połączonych końcówek: jednej do wysyłania, a drugiej do odbierania danych.
import multiprocessing
def pracownik(polaczenie):
    polaczenie.send("Proces pozdrawia serdecznie!")
    polaczenie.close()
if __name__ == "__main__":
    polaczenie_rodzica, polaczenie_dziecka = multiprocessing.Pipe()
    proces = multiprocessing.Process(target=pracownik, args=(polaczenie_dziecka,))
    proces.start()
    print(polaczenie_rodzica.recv())  # Odbieramy komunikat od procesu
    proces.join()Dla bardziej złożonych struktur danych, takich jak listy czy słowniki, można użyć multiprocessing.Manager, który pozwala na udostępnianie tych struktur między procesami.
import multiprocessing
def pracownik(lista, slownik):
    lista.append("Proces dodał element do listy")
    slownik["klucz"] = "Wartość dodana przez proces"
if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        lista = manager.list()
        slownik = manager.dict()
        
        proces = multiprocessing.Process(target=pracownik, args=(lista, slownik))
        proces.start()
        proces.join()
        
        print(lista)
        print(slownik)W powyższym przykładzie proces potomny dodaje elementy do współdzielonej listy i słownika.
Podczas korzystania z mechanizmów IPC należy pamiętać o możliwych zakleszczeniach (deadlocks). Ważne jest, aby:
get() czy recv() można ustawiać timeouty, aby uniknąć nieskończonego oczekiwania.
import multiprocessing
import queue
def pracownik(kolejka):
    try:
        kolejka.put("Proces pozdrawia serdecznie!", timeout=1)
    except queue.Full:
        print("Kolejka jest pełna")
if __name__ == "__main__":
    kolejka = multiprocessing.Queue(maxsize=1)
    proces = multiprocessing.Process(target=pracownik, args=(kolejka,))
    proces.start()
    try:
        print(kolejka.get(timeout=1))  # Odbieramy komunikat od procesu
    except queue.Empty:
        print("Kolejka jest pusta")
    proces.join()
import multiprocessing
# Funkcja obliczająca kwadraty liczb
def oblicz_kwadraty(liczby, wynik, indeks):
    print(f"Proces {indeks + 1} pracuje z {liczby}")
    for i, liczba w enumerate(liczby):
        wynik[i] = liczba * liczba
    print(f"Proces {indeks + 1} zakończony!")
# Główna część programu
if __name__ == "__main__":
    liczby = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    liczba_procesow = 3
    rozmiar = len(liczby) // liczba_procesow
    procesy = []
    wynik = multiprocessing.Array('i', len(liczby))
    # Tworzenie i uruchamianie procesów
    for i in range(liczba_procesow):
        indeks_startowy = rozmiar * i
        indeks_koncowy = len(liczby) if i == liczba_procesow - 1 else indeks_startowy + rozmiar
        p = multiprocessing.Process(target=oblicz_kwadraty, args=(liczby[indeks_startowy:indeks_koncowy], wynik[indeks_startowy:indeks_koncowy], i))
        procesy.append(p)
        p.start()
    # Czekanie na zakończenie wszystkich procesów
    for p in procesy:
        p.join()
    # Wyświetlenie wyników
    print(f"Kwadraty: {list(wynik)}")W tym kodzie dzielimy listę liczb na mniejsze fragmenty i przekazujemy je do różnych procesów. Każdy proces oblicza kwadraty liczb równolegle z innymi procesami. Używamy wspólnej pamięci (w postaci multiprocessing.Array) do przechowywania wyników, więc po zakończeniu wszystkich procesów możemy odczytać wyniki.
Kluczowe aspekty:
multiprocessing.Array umożliwia współdzielenie wyników między procesami. Jest to wspólna pamięć, w której procesy zapisują wyniki swoich obliczeń.W bardziej zaawansowanych scenariuszach warto rozważyć dynamiczne przypisywanie zadań procesom za pomocą kolejek. Poniższy przykład ilustruje, jak to zrobić:
import multiprocessing
def oblicz_kwadraty_task(kolejka, wynik):
    while True:
        indeks, liczba = kolejka.get()
        if liczba is None:
            break
        wynik[indeks] = liczba * liczba
if __name__ == "__main__":
    liczby = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    liczba_procesow = 3
    kolejka = multiprocessing.Queue()
    wynik = multiprocessing.Array('i', len(liczby))
    procesy = []
    # Tworzenie procesów
    for _ in range(liczba_procesow):
        p = multiprocessing.Process(target=oblicz_kwadraty_task, args=(kolejka, wynik))
        procesy.append(p)
        p.start()
    # Wysyłanie zadań do kolejki
    for i, liczba w enumerate(liczby):
        kolejka.put((i, liczba))
    # Wysyłanie sygnału zakończenia do procesów
    for _ in range(liczba_procesow):
        kolejka.put((None, None))
    # Czekanie na zakończenie wszystkich procesów
    for p in procesy:
        p.join()
    # Wyświetlenie wyników
    print(f"Kwadraty: {list(wynik)}")W powyższym przykładzie dynamicznie przypisujemy zadania procesom za pomocą kolejki. Jest to bardziej elastyczne podejście, które pozwala lepiej wykorzystać zasoby w przypadku, gdy liczba zadań jest zmienna lub gdy zadania mają różny czas wykonania.