|
| 1 | +{ |
| 2 | + "cells": [ |
| 3 | + { |
| 4 | + "cell_type": "code", |
| 5 | + "execution_count": 1, |
| 6 | + "id": "47e81551-616e-4b1c-aa97-4cb39512a333", |
| 7 | + "metadata": {}, |
| 8 | + "outputs": [ |
| 9 | + { |
| 10 | + "name": "stderr", |
| 11 | + "output_type": "stream", |
| 12 | + "text": [ |
| 13 | + "/Users/tomazbratanic/anaconda3/lib/python3.11/site-packages/pandas/core/arrays/masked.py:60: UserWarning: Pandas requires version '1.3.6' or newer of 'bottleneck' (version '1.3.5' currently installed).\n", |
| 14 | + " from pandas.core import (\n" |
| 15 | + ] |
| 16 | + } |
| 17 | + ], |
| 18 | + "source": [ |
| 19 | + "from neo4j import GraphDatabase\n", |
| 20 | + "import aiohttp\n", |
| 21 | + "import asyncio\n", |
| 22 | + "from langchain_text_splitters import TokenTextSplitter\n", |
| 23 | + "from datetime import datetime\n", |
| 24 | + "text_splitter = TokenTextSplitter(chunk_size=512, chunk_overlap=56)\n", |
| 25 | + "\n", |
| 26 | + "driver = GraphDatabase.driver(\"neo4j+s://diffbot.neo4jlabs.com:7687\", auth=(\"neo4j\", \"\"))" |
| 27 | + ] |
| 28 | + }, |
| 29 | + { |
| 30 | + "cell_type": "code", |
| 31 | + "execution_count": 2, |
| 32 | + "id": "32318b6b-e6a6-4ef6-904a-3e9557dd2920", |
| 33 | + "metadata": {}, |
| 34 | + "outputs": [], |
| 35 | + "source": [ |
| 36 | + "def get_org_ids(top_k: int = 1000):\n", |
| 37 | + " records, _, _ = driver.execute_query(\n", |
| 38 | + " \"MATCH (o:Organization) WHERE o.importance IS NOT NULL RETURN o.id AS id ORDER BY o.importance DESC LIMIT toInteger($limit)\",\n", |
| 39 | + " limit=top_k)\n", |
| 40 | + " return [el['id'] for el in records]" |
| 41 | + ] |
| 42 | + }, |
| 43 | + { |
| 44 | + "cell_type": "code", |
| 45 | + "execution_count": 3, |
| 46 | + "id": "92460238-9247-49f9-8621-177364fdd7a7", |
| 47 | + "metadata": {}, |
| 48 | + "outputs": [], |
| 49 | + "source": [ |
| 50 | + "DIFFBOT_TOKEN = \"\"\n", |
| 51 | + "\n", |
| 52 | + "async def get_articles(entity_id: str, date_after: str):\n", |
| 53 | + " # Base URL for the API call\n", |
| 54 | + " base_url = \"https://kg.diffbot.com/kg/v3/dql\"\n", |
| 55 | + " \n", |
| 56 | + " # Construct the query part\n", |
| 57 | + " query = f'type:Article tags.uri:\"http://diffbot.com/entity/{entity_id}\" strict:language:\"en\" date>\"{date_after}\" sortBy:date'\n", |
| 58 | + " \n", |
| 59 | + " # Create the full URL with the query and token\n", |
| 60 | + " url = f\"{base_url}?type=query&token={DIFFBOT_TOKEN}&query={query}&size=100\"\n", |
| 61 | + " \n", |
| 62 | + " # Make the GET request asynchronously\n", |
| 63 | + " async with aiohttp.ClientSession() as session:\n", |
| 64 | + " async with session.get(url) as response:\n", |
| 65 | + " if response.status == 200:\n", |
| 66 | + " return await response.json() # Return the JSON response\n", |
| 67 | + " else:\n", |
| 68 | + " return None # Handle the error case as needed\n" |
| 69 | + ] |
| 70 | + }, |
| 71 | + { |
| 72 | + "cell_type": "code", |
| 73 | + "execution_count": 4, |
| 74 | + "id": "3d45ddc9-3e67-4421-ad32-f1a4b091217f", |
| 75 | + "metadata": {}, |
| 76 | + "outputs": [], |
| 77 | + "source": [ |
| 78 | + "import_query = \"\"\"\n", |
| 79 | + "UNWIND $data AS row\n", |
| 80 | + "MERGE (a:Article {id: row.diffbotUri})\n", |
| 81 | + "SET a.pageUrl = row.pageUrl,\n", |
| 82 | + " a.title = row.title,\n", |
| 83 | + " a.language = row.language,\n", |
| 84 | + " a.sentiment = toFloat(row.sentiment),\n", |
| 85 | + " a.date = row.date_obj,\n", |
| 86 | + " a.summary = row.summary,\n", |
| 87 | + " a.publisherCountry = row.publisherCountry,\n", |
| 88 | + " a.publisherRegion = row.publisherRegion\n", |
| 89 | + "WITH a, row\n", |
| 90 | + "CALL (a, row) {\n", |
| 91 | + "UNWIND row.categories AS category\n", |
| 92 | + "MERGE (c:Category {name: category.name})\n", |
| 93 | + "MERGE (a)-[hc:HAS_CATEGORY]->(c)\n", |
| 94 | + "SET hc.score = toFloat(category.score)\n", |
| 95 | + "RETURN count(*) AS count\n", |
| 96 | + "}\n", |
| 97 | + "WITH a, row\n", |
| 98 | + "CALL (a, row) {\n", |
| 99 | + "UNWIND [el in row.tags WHERE el.uri IS NOT NULL | el ] AS tag\n", |
| 100 | + "MERGE (t:Tag {id: tag.uri})\n", |
| 101 | + "ON CREATE SET t.label = tag.label\n", |
| 102 | + "MERGE (a)-[ht:HAS_TAG]->(t)\n", |
| 103 | + "SET ht.score = toFloat(tag.score),\n", |
| 104 | + " ht.sentiment = toFloat(tag.sentiment)\n", |
| 105 | + "RETURN count(*) AS count\n", |
| 106 | + "}\n", |
| 107 | + "WITH a, row\n", |
| 108 | + "CALL (a, row) {\n", |
| 109 | + "UNWIND row.texts as text\n", |
| 110 | + "MERGE (a)-[:HAS_CHUNK]->(c:Chunk {index: text.index})\n", |
| 111 | + "SET c.text = text.text\n", |
| 112 | + "RETURN count(*) AS count\n", |
| 113 | + "}\n", |
| 114 | + "WITH a, row\n", |
| 115 | + "CALL (a, row) {\n", |
| 116 | + "WITH a, row\n", |
| 117 | + "WHERE row.author IS NOT NULL\n", |
| 118 | + "MERGE (au:Author {name: row.author})\n", |
| 119 | + "MERGE (a)-[:HAS_AUTHOR]->(au)\n", |
| 120 | + "}\n", |
| 121 | + "WITH a, row\n", |
| 122 | + "CALL (a, row) {\n", |
| 123 | + "UNWIND [el in row.videos WHERE el.url IS NOT NULL] as video\n", |
| 124 | + "MERGE (v:Video {uri: video.url})\n", |
| 125 | + "SET v.summary = video.summary,\n", |
| 126 | + " v.name = video.name\n", |
| 127 | + "MERGE (a)-[:HAS_VIDEO]->(v)\n", |
| 128 | + "RETURN count(*) AS count\n", |
| 129 | + "}\n", |
| 130 | + "RETURN count(*)\n", |
| 131 | + "\"\"\"" |
| 132 | + ] |
| 133 | + }, |
| 134 | + { |
| 135 | + "cell_type": "code", |
| 136 | + "execution_count": 5, |
| 137 | + "id": "de4bb443-2cd6-4497-ae99-6ef6dc6302c7", |
| 138 | + "metadata": {}, |
| 139 | + "outputs": [], |
| 140 | + "source": [ |
| 141 | + "for con in [\"CREATE CONSTRAINT article_unique_id IF NOT EXISTS FOR (a:Article) REQUIRE a.id IS UNIQUE;\",\n", |
| 142 | + "\"CREATE CONSTRAINT category_unique_name IF NOT EXISTS FOR (c:Category) REQUIRE c.name IS UNIQUE;\",\n", |
| 143 | + "\"CREATE CONSTRAINT tag_unique_id IF NOT EXISTS FOR (t:Tag) REQUIRE t.id IS UNIQUE;\",\n", |
| 144 | + "\"CREATE CONSTRAINT author_unique_name IF NOT EXISTS FOR (au:Author) REQUIRE au.name IS UNIQUE;\",\n", |
| 145 | + "\"CREATE CONSTRAINT video_unique_uri IF NOT EXISTS FOR (v:Video) REQUIRE v.uri IS UNIQUE;\"]:\n", |
| 146 | + " driver.execute_query(con, database_='articles')\n" |
| 147 | + ] |
| 148 | + }, |
| 149 | + { |
| 150 | + "cell_type": "code", |
| 151 | + "execution_count": 8, |
| 152 | + "id": "c60015b0-415f-41d1-ade7-7d0a9e9037b8", |
| 153 | + "metadata": {}, |
| 154 | + "outputs": [], |
| 155 | + "source": [ |
| 156 | + "step = 1000\n", |
| 157 | + "\n", |
| 158 | + "# Define a helper function to process the articles for a single organization\n", |
| 159 | + "async def process_organization(organization, date_after):\n", |
| 160 | + " data = await get_articles(organization, date_after)\n", |
| 161 | + " for ent in data['data']:\n", |
| 162 | + " ent['entity']['texts'] = [{'text': el, 'index': i} for i, el in enumerate(text_splitter.split_text(ent['entity']['text']))]\n", |
| 163 | + " ent['entity']['date_obj'] = datetime.fromtimestamp(ent['entity']['date']['timestamp'] / 1000)\n", |
| 164 | + " return [ent['entity'] for ent in data['data']]\n", |
| 165 | + "\n", |
| 166 | + "async def update_articles(top_k: int = 1000, date_after: str = '2024-01-01'):\n", |
| 167 | + " articles = []\n", |
| 168 | + " organization_ids = get_org_ids(top_k)\n", |
| 169 | + " # Gather all articles concurrently\n", |
| 170 | + " org_tasks = [process_organization(org, date_after) for org in organization_ids]\n", |
| 171 | + " org_results = await asyncio.gather(*org_tasks)\n", |
| 172 | + " # Flatten the list of results into one list\n", |
| 173 | + " for result in org_results:\n", |
| 174 | + " articles.extend(result)\n", |
| 175 | + " # Batch per 1000 articles\n", |
| 176 | + " for index in range(0, len(articles), step):\n", |
| 177 | + " batch = articles[index: index + step]\n", |
| 178 | + " driver.execute_query(import_query, data=batch, database_='articles')" |
| 179 | + ] |
| 180 | + }, |
| 181 | + { |
| 182 | + "cell_type": "code", |
| 183 | + "execution_count": 9, |
| 184 | + "id": "8fc14e8f-8d7d-4d06-93c7-e3ca25953ae2", |
| 185 | + "metadata": {}, |
| 186 | + "outputs": [], |
| 187 | + "source": [ |
| 188 | + "await update_articles(10)" |
| 189 | + ] |
| 190 | + }, |
| 191 | + { |
| 192 | + "cell_type": "code", |
| 193 | + "execution_count": null, |
| 194 | + "id": "fa9a6ac2-54a8-4665-87a3-aec597956c7c", |
| 195 | + "metadata": {}, |
| 196 | + "outputs": [], |
| 197 | + "source": [] |
| 198 | + } |
| 199 | + ], |
| 200 | + "metadata": { |
| 201 | + "kernelspec": { |
| 202 | + "display_name": "Python 3 (ipykernel)", |
| 203 | + "language": "python", |
| 204 | + "name": "python3" |
| 205 | + }, |
| 206 | + "language_info": { |
| 207 | + "codemirror_mode": { |
| 208 | + "name": "ipython", |
| 209 | + "version": 3 |
| 210 | + }, |
| 211 | + "file_extension": ".py", |
| 212 | + "mimetype": "text/x-python", |
| 213 | + "name": "python", |
| 214 | + "nbconvert_exporter": "python", |
| 215 | + "pygments_lexer": "ipython3", |
| 216 | + "version": "3.11.5" |
| 217 | + } |
| 218 | + }, |
| 219 | + "nbformat": 4, |
| 220 | + "nbformat_minor": 5 |
| 221 | +} |
0 commit comments