1
- #!pyspark --packages graphframes:graphframes:0.8.3-spark3.5-s_2.12 --driver-memory 16g --executor-memory 8g
1
+ #!/usr/bin/env pyspark --packages graphframes:graphframes:0.8.3-spark3.5-s_2.12 --driver-memory 16g --executor-memory 8g
2
2
3
3
import os
4
4
import resource
@@ -207,6 +207,31 @@ def add_missing_columns(df, all_cols):
207
207
print (f"Total VotedFor edges: { voted_for_edge_df .count ():,} " )
208
208
print (f"Percentage of linked votes: { voted_for_edge_df .count () / votes_df .count ():.2%} \n " )
209
209
210
+ #
211
+ # Create a [User]--Cast-->[Vote] edge
212
+ #
213
+ user_voted_df : DataFrame = users_df .select (
214
+ F .col ("id" ).alias ("src" ),
215
+ F .col ("Id" ).alias ("UserId" ),
216
+ # Everything has all the fields - should build from base records but need UUIDs
217
+ F .col ("PostId" ).alias ("VotePostId" ),
218
+ )
219
+ user_voted_edge_df : DataFrame = (
220
+ user_voted_df .join (votes_df , user_voted_df .UserId == votes_df .Id )
221
+ .select (
222
+ # 'src' comes from the votes' 'id'
223
+ "src" ,
224
+ # 'dst' comes from the posts' 'id'
225
+ F .col ("id" ).alias ("dst" ),
226
+ # All edges have a 'relationship' field
227
+ F .lit ("Cast" ).alias ("relationship" ),
228
+ )
229
+ .cache ()
230
+ )
231
+ print (f"Total VotedFor edges: { voted_for_edge_df .count ():,} " )
232
+ print (f"Percentage of linked votes: { voted_for_edge_df .count () / votes_df .count ():.2%} \n " )
233
+
234
+
210
235
#
211
236
# Create a answer[User]--Answered-->question[Post] edge
212
237
#
@@ -248,7 +273,7 @@ def add_missing_columns(df, all_cols):
248
273
# Create a [User]--Posted-->[Post] edge
249
274
#
250
275
src_posts_df : DataFrame = posts_df .select (
251
- F .col ("id" ).alias ("src " ),
276
+ F .col ("id" ).alias ("dst " ),
252
277
F .col ("Id" ).alias ("PostPostId" ),
253
278
F .col ("OwnerUserId" ).alias ("PostOwnerUserId" ),
254
279
)
@@ -257,9 +282,9 @@ def add_missing_columns(df, all_cols):
257
282
src_posts_df .join (users_df , src_posts_df .PostOwnerUserId == users_df .Id )
258
283
.select (
259
284
# 'src' comes from the posts' 'id'
260
- " src" ,
285
+ F . col ( "id" ). alias ( " src") ,
261
286
# 'dst' comes from the users' 'id'
262
- F . col ( "id" ). alias ( " dst") ,
287
+ " dst" ,
263
288
# All edges have a 'relationship' field
264
289
F .lit ("Posted" ).alias ("relationship" ),
265
290
)
@@ -307,6 +332,39 @@ def add_missing_columns(df, all_cols):
307
332
# Test out Connected Components
308
333
sc .setCheckpointDir ("/tmp/spark-checkpoints" )
309
334
components = g .connectedComponents ()
310
- (components .select ("id" , "component" ).groupBy ("component" ).count ().sort (F .desc ("count" )).show ())
311
-
312
- g .find ("(a)-[e]->(b)" ).show ()
335
+ components .select ("id" , "component" ).groupBy ("component" ).count ().sort (F .desc ("count" )).show ()
336
+
337
+ # Shows (User)-[VotedFor]->(Post)--(Answers)->(Post)
338
+ paths = g .find ("(a)-[e]->(b); (b)-[e2]->(c)" )
339
+ paths .select ("a.Type" , "e.*" , "b.Type" , "e2.*" , "c.Type" ).show ()
340
+
341
+ # Shows two matches:
342
+ # - (Post)-[Answers]->(Post)<--[Posted]-(User)
343
+ # - (Post)-[Answers]->(Post)<--[VotedFor]-(User)
344
+ paths = g .find ("(a)-[e]->(b); (c)-[e2]->(a)" )
345
+ paths .select ("a.Type" , "e.*" , "b.Type" , "e2.*" , "c.Type" ).show ()
346
+
347
+ # Figure out how often questions are answered and the question poster votes for the answer. Neat!
348
+ # Shows (User A)-[Posted]->(Post)<-[Answers]-(Post)<-[VotedFor]-(User)
349
+ paths = g .find ("(a)-[e]->(b); (c)-[e2]->(b); (d)-[e3]->(c)" )
350
+
351
+ # If the node types are right...
352
+ paths = paths .filter (F .col ("a.Type" ) == "User" )
353
+ paths = paths .filter (F .col ("b.Type" ) == "Post" )
354
+ paths = paths .filter (F .col ("c.Type" ) == "Post" )
355
+ paths = paths .filter (F .col ("d.Type" ) == "User" )
356
+
357
+ # If the edge types are right...
358
+ # paths = paths.filter(F.col("e.relationship") == "Posted")
359
+ # paths = paths.filter(F.col("e2.relationship") == "Answers")
360
+ paths = paths .filter (F .col ("e3.relationship" ) == "VotedFor" )
361
+
362
+ paths .select (
363
+ "a.Type" ,
364
+ "e.relationship" ,
365
+ "b.Type" ,
366
+ "e2.relationship" ,
367
+ "c.Type" ,
368
+ "e3.relationship" ,
369
+ "d.Type" ,
370
+ ).show ()
0 commit comments