Erstellen DB-Verbindung und pflegen auf mehrere Prozesse (multiprocessing)
Ähnlich zu einem anderen post habe ich gemacht, diese Antworten, die post und erstellt eine neue Frage stellen.
Rückblick: ich aktualisieren jeden Datensatz in einer räumlichen Datenbank, in der ich einen Datensatz von Punkten, die overlay-Daten-Satz von Polygonen. Für jedes Punkt-feature möchte ich eine Taste zuordnen, beziehen sich auf die polygon-Funktion, die es innerhalb liegt. Also, wenn mein Punkt 'New York City' liegt im polygon USA und für die USA polygon 'GID = 1" ich werde weisen Sie 'gid_fkey = 1' für meinen Standpunkt New York City.
Okay, so erreicht wurde dies mit multiprocessing. Ich habe bemerkt, eine Steigerung von 150% in der Geschwindigkeit mit dieser, so dass es funktioniert. Aber ich denke, es gibt einen Haufen unnötigen overhead wie eine DB-Verbindung erforderlich ist, für jeden Datensatz.
So, hier ist der code:
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self):
pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
w.start()
pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()
pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')
temp = pyCursorX.fetchall()
num_job = temp[0]
num_jobs = num_job[0]
pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')
cityIdListTuple = pyCursorX.fetchall()
cityIdListList = []
for x in cityIdListTuple:
cityIdList.append(x[0])
for i in xrange(num_jobs):
tasks.put(Task(cityIdList[i - 1]))
for i in xrange(num_consumers):
tasks.put(None)
while num_jobs:
result = results.get()
print result
num_jobs -= 1
Sieht es zwischen 0,3 und 1,5 Sekunden pro Verbindung, wie ich es Messen mit 'Zeit' - Modul.
Ist es ein Weg, um eine DB-Verbindung pro Prozess und dann verwenden Sie einfach die city_id info als eine variable, die ich füttern kann in einer Abfrage der cursor in diesem öffnen? Diese Weise ich sagen, vier Prozessen mit je einer DB-Verbindung und dann fallen mir city_id irgendwie zu verarbeiten.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Versuchen zu isolieren, die zur Erstellung der Verbindung in den Consumer-Konstruktor, dann geben Sie es an die ausgeführte Aufgabe :