python


Celery chunks large data set


I'm trying to use celery's chunks functionality to divide my iterable dataset into pieces, which is then sent to a celery task for further processing.
I have a query_set that I got from making the following sqlalchemy call
query_set = MyModel.query.join(OtherModel).all())
Currently, query_set is a list of tuples. The lenth of query_results is at 40,000 and growing.
I have another function (celery task) that crunches the data in query_set, whose definition is
#celery_app.task
def crunch_qs(query_set):
. . .
. . .
Since query_set is a list of tuples, I figured I could pass it directly to crunch_qs like this
crunched_qs = crunch_qs.chunks(query_set, 5000)()
results = crunched_qs.get()
That did not work. It gave me an unexpected result. It was unpacking the items in each query_set's tuple and sending them to crunch_qs.
So crunch_qs would receive **query_set[0] on first iteration, which raised the following error
TypeError: crunch_qs() takes exactly 1 argument (10 given)
len(query_set[0]) = 10
I also tried..
crunched_qs = crunch_qs.chunks((row,) for row in query_set, 5000)()
results = crunched_qs.get()
That worked a little better. The TypeError went away. However, my crunch_qs function is now getting each row (tuple) as a parameter instead of a list of tuples whose length is 5000.
Any help/ideas on how to pass a list of tuples to celery chunks would be highly appreciated.
Thanks in advance

Related Links

Firebase Console but there is error tell “Error Generating Download URL”
Python Twitter Streaming Timeline
Interval intersection in pandas
Putting double quotes for an output string
How to query with raw SQL using Session or engine
getUser return EDAMSystemException errorCode=8
How to print the subject of a single email using uid
Avoiding Python globals in Google App/Cloud Engine
Python Google Voice get texts from [number]
Create a dictionary from values of a function, and using that for a graph
Attributes just disappear when I give a method as parameter
Send entries for FieldList of StringField from postman to flask
Canonical Discriminant Function in Python sklearn
Python requests ConnectionError
Identification Code (Python3)
Convert non-nested json to csv file?

Categories

HOME
dotnetrdf
cil
semantic-ui
websphere
system-verilog
ocaml
boost-thread
aem
d
apache2
laravel-4
algorithmic-trading
java-home
httpclient
actionscript-2
android-viewpager
ipfs
ndis
glpk
off-canvas-menu
anychart
ng-show
url.action
yosys
samsung-mobile
unpack
owl-api
forum
bxslider
red-black-tree
spring-profiles
oracle-xml-db
canvasjs
shapes
geopy
large-data
axis-labels
control-flow-graph
stereo-3d
dojox.mobile
service-fabric-stateful
crop
amd
onmouseover
recycle-bin
taskmanager
rollback
fontconfig
hpcc
search-box
jszip
qos
vici
iostat
alter
glkit
wcf-ria-services
cron-task
pax-web
spoofing
gradle-release-plugin
supportmapfragment
license-key
asymptote
textkit
rx-groovy
ubercart
google-earth-plugin
pl-i
moai
line-numbers
qtestlib
dmp
dalekjs
android-looper
mbox
zipalign
reserved-words
crystal-reports-10
svcutil.exe
internal
pyjade
vertical-rhythm
html-form-post
shortcuts
dwolla
symphony-cms
datarepeater
getstring
concurrent-programming
entity-framework-ctp5
hashalgorithm
zookeeper
squeel
rijndael
yajl
visual-c++-2008-express
openvg
webrat
asp.net-mvc-controller
openwysiwyg
dirty-data
ajaxpro

Resources

Mobile Apps Dev
Database Users
javascript
java
csharp
php
android
MS Developer
developer works
python
ios
c
html
jquery
RDBMS discuss
Cloud Virtualization
Database Dev&Adm
javascript
java
csharp
php
python
android
jquery
ruby
ios
html
Mobile App
Mobile App
Mobile App