# python3 example using MPI and multiprocessing # # R. Perry, August 2018 # # Distributed exhaustive search of n-digit PINs corresponding to a hash # # Usage: python3 pinHashMPI hash ndigits # # where hash is SHA1 of an n-digit PIN, e.g. created by genHash.py # # Example: python3 pinHashMPI.py 7110eda4d09e062aa5e4a390b0a572ac0d2c0220 4 # should find PIN = 1234 # # The search space is divided up among the hosts (nodes) depending on the number of CPUs, # assuming identical CPUs across all nodes. # # Each host divides up its portion of the space among cpu_count() separate processes. # # run one on current host: python3 pinHashMPI.py ... # run four (1 leader, 4 workers) on current host: mpirun -n 5 python3 pinHashMPI.py ... # run four on distributed hosts: # mpirun -n 5 -hosts Node-009:2,Node-010,Node-011,Node-012 python3 pinHashMPI.py ... # run four on distributed hosts via slurm: sbatch slurm.sh # slurm.sh contains: # #!/bin/sh # #SBATCH -N4 -n5 # mpirun python3 pinHashMPI.py ... ###################################################################### from sys import argv, exit from hashlib import sha1 from time import time from multiprocessing import Process, Queue, current_process, cpu_count from mpi4py import MPI ###################################################################### # one sub-process search # # check hash for n-digit PINs from start to stop-1, sending 4 progress messages along the way # def search( n, start, stop, h, q, node): name = node + "-" + current_process().name; fmt = "0" + str(n) + "d"; M4 = (stop-start)//4; M = start + M4; b = " " q.put( name + " " + format(start,fmt) + " ...") pin = start while pin < stop: if pin == M: q.put( name + b + " ... " + format(pin,fmt)); M += M4; b += " " d = bytearray( format(pin,fmt), 'utf-8') md = sha1(d).digest() if md == h: q.put( name + " match: PIN = " + format(pin,fmt) ); break pin += 1 q.put( name + b + " ... " + format(pin-1,fmt) + " done") ###################################################################### # run compute node sub-processes to perform searches # def compute( n, istart, istop, h, node, comm, debug): np = cpu_count() N = istop - istart q = Queue() d = N//np # might not divide evenly p = [None for i in range(np)] stop = istart for i in range(np): # equal stop-start ranges assuming identical CPUs on a node start = stop; stop = start + d if i < np-1 else istop # handle d roundoff p[i] = Process( target=search, args=(n, start, stop, h, q, node,)) p[i].start() if debug: print( node + ": i =", i, ", start =", start, ", stop =", stop, flush=True) result = node + ": DONE. PIN not found." # get results # done = 0 while done < np: msg = q.get(); if debug: print(msg,flush=True) if "match" in msg: result = msg; break if "done" in msg: done += 1 # if match found, terminate the processes # if done < np: for i in range(np): p[i].terminate() return result ###################################################################### # if __name__ == '__main__': # pick one: debug = False #debug = True # save start time # t0 = time() # get hash (h), number of digits (n), and max PIN (N-1) # if len(argv) != 3: hash = "7110eda4d09e062aa5e4a390b0a572ac0d2c0220"; n = 4 # raise Exception("Usage: " + argv[0] + ": hash ndigits") else: hash = argv[1]; n = int( argv[2]) h = bytes.fromhex( hash) N = pow( 10, n) # get MPI parameters # comm = MPI.COMM_WORLD # reference to all processes (ranks) of this job size = comm.Get_size() # number of processes rank = comm.Get_rank() # index of this process, 0 ... (size-1) node = MPI.Get_processor_name() # name of this processor (node or host) if debug: print( node + ": size =", size, ", rank =", rank, flush=True) # note that greeting includes cpu_count which the leader uses to determine search range distribution # greeting = str(cpu_count()) + " CPUs: process " + str(rank) + " out of " + str(size) + ", on " + node # if no workers, leader does the search # if size == 1: print( node + ": hash =", hash, ", n =", n) print( greeting) result = compute( n, 0, N, h, node, comm, debug) print( "\n" + result) MPI.Finalize() t1 = time() # end time print( "\n" + node + ": Elapsed time:", '%g' % (t1-t0), "seconds\n") exit(0) # with workers available, leader collects messages and distributes the search space # if rank == 0: print( node + ": hash =", hash, ", n =", n, flush=True) # cpu = [0 for i in range(size)] cpu_total = 0 for i in range(1,size): greeting = comm.recv( source=i) print( greeting) cpu[i] = int(greeting.split()[0]) cpu_total += cpu[i] print( node + ": cpu =", cpu) print( node + ": cpu_total = ", cpu_total, flush=True) # d = N//cpu_total # might not divide evenly stop = 0 for i in range(1,size): # stop-start ranges proportional to number of CPUs, assuming identical CPUs across all nodes start = stop stop = start + d*cpu[i] if i < size-1 else N # handle d roundoff if debug: print( node + ": i =", i, ", start =", start, ", stop =", stop, flush=True) comm.send( (start,stop), i) # # get results # DONE = 0 result = "PIN not found." while DONE < size-1: msg = comm.recv(); if debug: print( msg, flush=True); if "match" in msg: result = msg; break if "DONE" in msg: DONE += 1 print( "\n" + result) t1 = time() # end time print( "\n" + node + ": Elapsed time:", '%g' % (t1-t0), "seconds\n", flush=True) # # if match found, terminate all processes # if debug: print( "DONE =", DONE, ", size =", size, flush=True) if DONE < size-1: comm.Abort() # # workers # else: comm.send( greeting, 0) (start, stop) = comm.recv( source=0) print( node + ": start =", start, ", stop =", stop, flush=True) result = compute( n, start, stop, h, node, comm, debug) if debug: print( node + ": result =", result, flush=True) comm.send( result, 0) # MPI.Finalize()