python


Join two RDDs on custom function - SPARK


Is it possible to Join two RDDs in Spark on a custom function?
I have two big RDDs with a string as key. I want to join them not using the classic Join but a custom function like:
def my_func(a,b):
return Lev.distance(a,b) < 2
result_rdd = rdd1.join(rdd2, my_func)
If it's not possible, is there any alternative that will continue to use the benefits of spark clusters?
I wrote something like this but pyspark will not be able to distribuite the work on my small cluster.
def custom_join(rdd1, rdd2, my_func):
a = rdd1.sortByKey().collect()
b = rdd2.sortByKey().collect()
i = 0
j = 0
res = []
while i < len(a) and j < len(b):
if my_func(a[i][0],b[j][0]):
res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))]
i+=1
j+=1
elif a[i][0] < b[j][0]:
i+=1
else:
j+=1
return sc.parallelize(res)
Thanks in advance (and sorry for my english because I'm italian)
You can use cartesian and then filter based on conditions.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("b", 3)])
def customFunc(x):
# You may use any condition here
return x[0][0] ==x[1][0]
print(x.join(y).collect()) # normal join
# replicating join with cartesian
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect())
Output:
[('b', (4, 3)), ('a', (1, 2))]
[('a', (1, 2)), ('b', (4, 3))]

Related Links

Interval intersection in pandas
Putting double quotes for an output string
How to query with raw SQL using Session or engine
getUser return EDAMSystemException errorCode=8
How to print the subject of a single email using uid
Avoiding Python globals in Google App/Cloud Engine
Python Google Voice get texts from [number]
Create a dictionary from values of a function, and using that for a graph
Attributes just disappear when I give a method as parameter
Send entries for FieldList of StringField from postman to flask
Canonical Discriminant Function in Python sklearn
Python requests ConnectionError
Identification Code (Python3)
Convert non-nested json to csv file?
Is there any way to import python modules for an entire package?
Writing a list to a file without a newline Python

Categories

HOME
jsf
payment-gateway
signalr
oauth
angular-cli
angular2-routing
ubuntu-16.04
vs-team-services
memory-leaks
extract
browserify
jframe
google-spreadsheet-api
ipfs
powershell-v3.0
rfid
xul
data-synchronization
symfony-forms
sendkeys
bar-chart
phpbb
ms-access-2007
chocolatey
primes
nmake
epicor
myob
searchbar
croppic
pipelinedb
opencart2.3
image-quality
helper
pcre
mangodb
protobuf-net
pljson
geopy
rhomobile
fabric-digits
mapguide
gettext
passenger
elfinder
fax
parent
android-preferences
exiftool
jupyter-console
jdbi
resuming-training
matlab-cvst
php-ews
rdw
ajp
ruby-on-rails-2
castle-dynamicproxy
keycode
cleditor
packagemaker
gameanalytics
brython
ng2-material
clob
elastix
financial
xmp
altbeacon
lemoon
persistence.xml
crash-dumps
start-job
grunt-express
ogr2ogr
phpdocx
lumx
late-static-binding
visual-studio-addins
sim900
cling
fogbugz-api
sttwitter
dig
ienumerator
client-side-scripting
digital-design
marmalade-edk
cgpath
cbcentralmanager
moq-3
javascriptserializer
cross-domain-policy
selectmanycheckbox
search-path
sendfile
getresource
hardware-acceleration
pci-bus
odbc-sql-server-driver

Resources

Mobile Apps Dev
Database Users
javascript
java
csharp
php
android
MS Developer
developer works
python
ios
c
html
jquery
RDBMS discuss
Cloud Virtualization
Database Dev&Adm
javascript
java
csharp
php
python
android
jquery
ruby
ios
html
Mobile App
Mobile App
Mobile App