import sys import time import multiprocessing """ On top of v0, v1 setup messagequeue to communicate maxdiff1 so that all process has the same maxdiff1 in each iteration: all process reduce maxdiff1 to process0, than process0 broadcast maxdiff1 to all processes This program distributes the compute (decompose the domain) based on myID and nprocs. There are issues that are not resolved: (1) boundary condition was not set correctly (2) maxdiff1 is only local to each process (3) the result data in distributed in different processes """ # Constants N = 1024 nprocs = 1 MAXDIFF = 0.5 iter = 0 if (len(sys.argv) > 1): nprocs = int(sys.argv[1]) if (len(sys.argv) > 2): N = int(sys.argv[2]) # Initialize the two 2D arrays x = [[0.0 for _ in range(N + 2)] for _ in range(N + 2)] y = [[0.0 for _ in range(N + 2)] for _ in range(N + 2)] #distribute computation based on myID and nprocs def parallelComputeMyBlockOnly(myId): global x global y blockSize = N // nprocs print(f"process {myId} running ....") maxdiff1 = 100000.0 iteration = 0 # Iterative computation while maxdiff1 > MAXDIFF: maxdiff1 = -1.0 for i in range(myId*blockSize + 1, N+1 if myId == nprocs -1 else (myId+1) * blockSize + 1): for j in range (1, N+1): y[i][j] = ( 0.25 * x[i - 1][j] + 0.25 * x[i + 1][j] + 0.25 * x[i][j - 1] + 0.25 * x[i][j + 1] ) diff = abs(x[i][j] - y[i][j]) if diff > maxdiff1: maxdiff1 = diff for i in range(myId*blockSize + 1, N+1 if myId == nprocs -1 else (myId+1) * blockSize + 1): for j in range(1, N + 1): x[i][j] = y[i][j] if (nprocs > 1): if (myId == nprocs -1) : prev[myId].put(maxdiff1) elif (myId == 0) : tmp = prev[myId+1].get() maxdiff1 = tmp if tmp > maxdiff1 else maxdiff1 else : # this is for all processes from 1 to nprocs-2 tmp = prev[myId+1].get() maxdiff1 = tmp if tmp > maxdiff1 else maxdiff1 prev[myId].put(maxdiff1) if (myId ==0) : next[myId].put(maxdiff1) elif (myId == nprocs -1): maxdiff1 = next[myId-1].get() else : # all in between maxdiff1 = next[myId-1].get() next[myId].put(maxdiff1) print(f"myID: {myId}, iteration = {iteration}, maxdiff1 = {maxdiff1:.3f}, MAXDIFF = {MAXDIFF:.3f}") iteration += 1 print(f"{N}x{N} mesh, maxdiff = {MAXDIFF:.3f}") start_time = time.time() # Boundary initialization for i in range(1, N + 2): x[i][N + 1] = y[i][N + 1] = 0.1 * i x[N + 1][i] = y[N + 1][i] = 0.1 * i """ setup message queues between neighboring processes: P0<->P1, P1<->P2, P2<->P3, and so forth next[i] is the queue from Pi to P_{i+1} prev[i] is the queue from Pi to P_{i-1} """ next = [] prev = [] for i in range(nprocs): next.append(multiprocessing.Queue()) prev.append(multiprocessing.Queue()) #setup processes processes = [] for i in range(nprocs): processes.append(multiprocessing.Process(target=parallelComputeMyBlockOnly, args=(i,))) for i in range(nprocs): processes[i].start() for i in range(nprocs): processes[i].join() print(f"total time: {time.time() - start_time:.2f} seconds") # write to file try: with open("output.txt", "w") as f: # Write x[N//2][j] for j in 0..N+1 for j in range(N + 2): f.write(f"{x[N//2][j]:.4f}\n") # Write x[j][NN//2][t1] for j in 0..N+1 for j in range(N + 2): f.write(f"{x[j][N // 2]:.4f}\n") except IOError as e: print("File write error:", e)