Skip to content

Commit

Permalink
Add changes for 82347fc
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed May 6, 2024
1 parent f0d3d6b commit 91685b4
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 6 deletions.
80 changes: 80 additions & 0 deletions How2Guides/FlowCallbacks.html
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
<li class="toctree-l3"><a class="reference internal" href="#initialization-and-settings">Initialization and Settings</a></li>
<li class="toctree-l3"><a class="reference internal" href="#entry-points">Entry Points</a></li>
<li class="toctree-l3"><a class="reference internal" href="#new-fields">new_* Fields</a></li>
<li class="toctree-l3"><a class="reference internal" href="#override-fields">Override Fields</a></li>
<li class="toctree-l3"><a class="reference internal" href="#customizing-duplicate-suppression">Customizing Duplicate Suppression</a></li>
<li class="toctree-l3"><a class="reference internal" href="#customizing-post-exchangesplit">Customizing post_exchangeSplit</a></li>
<li class="toctree-l3"><a class="reference internal" href="#sample-flowcb-sub-class">Sample Flowcb Sub-Class</a></li>
<li class="toctree-l3"><a class="reference internal" href="#download-renaming">Download Renaming</a></li>
<li class="toctree-l3"><a class="reference internal" href="#web-sites-with-non-standard-file-listings">Web Sites with non-standard file listings</a></li>
Expand Down Expand Up @@ -385,6 +388,83 @@ <h2>new_* Fields<a class="headerlink" href="#new-fields" title="Link to this hea
<li><p>msg[‘new_subtopic’] … the subtopic hierarchy that will be encoded in the notification message for downstream consumers.</p></li>
</ul>
</section>
<section id="override-fields">
<h2>Override Fields<a class="headerlink" href="#override-fields" title="Link to this heading"></a></h2>
<p>To change processing of messages, one can set overrides to change how built-in algorithms work.
For example:</p>
<ul class="simple">
<li><p>msg[‘nodupe_override’] = { ‘key’: …, ‘path’: … } changes how the duplicate detection operates.</p></li>
<li><p>msg[‘topic’] … defines the topic of a published message (instead of being calculated from other fields.)</p></li>
<li><p>msg[‘exchangeSplitOverride’] = int … changes how post_ExchangeSplit chooses among multiple postExchanges.</p></li>
</ul>
</section>
<section id="customizing-duplicate-suppression">
<h2>Customizing Duplicate Suppression<a class="headerlink" href="#customizing-duplicate-suppression" title="Link to this heading"></a></h2>
<p>The built-in processing for duplicates is to use the identity field as a key, and store the path as the value.
So if a file is received with the same key, and the path is already present, then it is considered a duplicate
and dropped.</p>
<p>In some cases, we may want only the file name to be used, so if any file with the same name is received twice,
regardless of content, then it should be considered a duplicate and dropped. This is useful when multiple systems
are producing the same products, but they are not bitwise identical. The built-in flowcb that implements
that functionality is below:</p>
<div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">from</span> <span class="nn">sarracenia.flowcb</span> <span class="kn">import</span> <span class="n">FlowCB</span>

<span class="n">logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>


<span class="k">class</span> <span class="nc">Name</span><span class="p">(</span><span class="n">FlowCB</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Override the the comparison so that files with the same name,</span>
<span class="sd"> regardless of what directory they are in, are considered the same.</span>
<span class="sd"> This is useful when receiving data from two different sources (two different trees)</span>
<span class="sd"> and winnowing between them.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">after_accept</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">worklist</span><span class="p">):</span>
<span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">worklist</span><span class="o">.</span><span class="n">incoming</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="s1">&#39;nodupe_override&#39;</span> <span class="ow">in</span> <span class="n">m</span><span class="p">:</span>
<span class="n">m</span><span class="p">[</span><span class="s1">&#39;_deleteOnPost&#39;</span><span class="p">]</span> \<span class="o">|=</span> <span class="nb">set</span><span class="p">([</span><span class="s1">&#39;nodupe_override&#39;</span><span class="p">])</span>
<span class="n">m</span><span class="p">[</span><span class="s1">&#39;nodupe_override&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>

<span class="n">m</span><span class="p">[</span><span class="s1">&#39;nodupe_override&#39;</span><span class="p">][</span><span class="s1">&#39;path&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">m</span><span class="p">[</span><span class="s1">&#39;relPath&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;/&#39;</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span>
<span class="n">m</span><span class="p">[</span><span class="s1">&#39;nodupe_override&#39;</span><span class="p">][</span><span class="s1">&#39;key&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">m</span><span class="p">[</span><span class="s1">&#39;relPath&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;/&#39;</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span>
</pre></div>
</div>
</section>
<section id="customizing-post-exchangesplit">
<h2>Customizing post_exchangeSplit<a class="headerlink" href="#customizing-post-exchangesplit" title="Link to this heading"></a></h2>
<p>The exchangeSplit function allows a single flow to send outputs to different exchanges,
numbered 1…n to provide load distribution. The built-in processing does this in a
fixed way based on the hash of the identify field. The purpose of exchangeSplit is to
allow a common set of downstream paths to receive a subset of the total flow, and for
products with similar “routing” to land on the same downstream node. For example, a file
with a given checksum, for winnowing to work, has to land on the same downstream node.</p>
<p>It could be that, rather than using a checksum, one would prefer to use some other
method to decide which exchange is used:</p>
<div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">from</span> <span class="nn">sarracenia.flowcb</span> <span class="kn">import</span> <span class="n">FlowCB</span>
<span class="kn">import</span> <span class="nn">hashlib</span>
<span class="n">logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>


<span class="k">class</span> <span class="nc">Distbydir</span><span class="p">(</span><span class="n">FlowCB</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Override the use of the identity field so that products can be grouped by directory in the relPath</span>
<span class="sd"> This ensures that all products received from the same directory get posted to the same</span>
<span class="sd"> exchange when post_exchangeSplit is active.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">after_accept</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">worklist</span><span class="p">):</span>
<span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">worklist</span><span class="o">.</span><span class="n">incoming</span><span class="p">:</span>
<span class="n">m</span><span class="p">[</span><span class="s1">&#39;_deleteOnPost&#39;</span><span class="p">]</span> <span class="o">|=</span> <span class="nb">set</span><span class="p">([</span><span class="s1">&#39;exchangeSplitOverride&#39;</span><span class="p">])</span>
<span class="n">m</span><span class="p">[</span><span class="s1">&#39;exchangeSplitOverride&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">m</span><span class="p">[</span><span class="s1">&#39;relPath&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">sep</span><span class="p">)[</span><span class="o">-</span><span class="mi">2</span><span class="p">])</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">()[</span><span class="mi">0</span><span class="p">])</span>
</pre></div>
</div>
<p>This routine sets the exchangeSplitOverride field, which needs to be an integer
that will be used to pick which of the n exchanges in the post_exchangeSplit
exchanges defined. This routine calculates a checksum of the directory
containing the file and then converts the first character of that checksum
to an integer. If the directory is the same, the exchange chosen will be the same.</p>
</section>
<section id="sample-flowcb-sub-class">
<h2>Sample Flowcb Sub-Class<a class="headerlink" href="#sample-flowcb-sub-class" title="Link to this heading"></a></h2>
<p>This is an example callback class file (gts2wis2.py) that accepts files whose
Expand Down
2 changes: 1 addition & 1 deletion Reference/code.html
Original file line number Diff line number Diff line change
Expand Up @@ -2214,7 +2214,7 @@

<dl class="py method">
<dt class="sig sig-object py" id="sarracenia.moth.Moth.putNewMessage">
<span class="sig-name descname"><span class="pre">putNewMessage</span></span><span class="sig-paren">(</span><em class="sig-param"><span class="n"><span class="pre">message</span></span><span class="p"><span class="pre">:</span></span><span class="w"> </span><span class="n"><a class="reference internal" href="#sarracenia.Message" title="sarracenia.Message"><span class="pre">Message</span></a></span></em>, <em class="sig-param"><span class="n"><span class="pre">content_type</span></span><span class="p"><span class="pre">:</span></span><span class="w"> </span><span class="n"><span class="pre">str</span></span><span class="w"> </span><span class="o"><span class="pre">=</span></span><span class="w"> </span><span class="default_value"><span class="pre">'application/json'</span></span></em><span class="sig-paren">)</span> <span class="sig-return"><span class="sig-return-icon">&#x2192;</span> <span class="sig-return-typehint"><span class="pre">bool</span></span></span><a class="reference internal" href="../_modules/sarracenia/moth.html#Moth.putNewMessage"><span class="viewcode-link"><span class="pre">[source]</span></span></a><a class="headerlink" href="#sarracenia.moth.Moth.putNewMessage" title="Link to this definition"></a></dt>
<span class="sig-name descname"><span class="pre">putNewMessage</span></span><span class="sig-paren">(</span><em class="sig-param"><span class="n"><span class="pre">message</span></span><span class="p"><span class="pre">:</span></span><span class="w"> </span><span class="n"><a class="reference internal" href="#sarracenia.Message" title="sarracenia.Message"><span class="pre">Message</span></a></span></em>, <em class="sig-param"><span class="n"><span class="pre">content_type</span></span><span class="p"><span class="pre">:</span></span><span class="w"> </span><span class="n"><span class="pre">str</span></span><span class="w"> </span><span class="o"><span class="pre">=</span></span><span class="w"> </span><span class="default_value"><span class="pre">'application/json'</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">exchange</span></span><span class="p"><span class="pre">:</span></span><span class="w"> </span><span class="n"><span class="pre">str</span><span class="w"> </span><span class="p"><span class="pre">|</span></span><span class="w"> </span><span class="pre">None</span></span><span class="w"> </span><span class="o"><span class="pre">=</span></span><span class="w"> </span><span class="default_value"><span class="pre">None</span></span></em><span class="sig-paren">)</span> <span class="sig-return"><span class="sig-return-icon">&#x2192;</span> <span class="sig-return-typehint"><span class="pre">bool</span></span></span><a class="reference internal" href="../_modules/sarracenia/moth.html#Moth.putNewMessage"><span class="viewcode-link"><span class="pre">[source]</span></span></a><a class="headerlink" href="#sarracenia.moth.Moth.putNewMessage" title="Link to this definition"></a></dt>
<dd><p>publish a message as set up to the given topic.</p>
<p>return True is succeeded, False otherwise.</p>
<dl class="simple">
Expand Down
2 changes: 1 addition & 1 deletion _modules/sarracenia/moth.html
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ <h1>Source code for sarracenia.moth</h1><div class="highlight"><pre>

<div class="viewcode-block" id="Moth.putNewMessage">
<a class="viewcode-back" href="../../api-documentation.html#sarracenia.moth.Moth.putNewMessage">[docs]</a>
<span class="k">def</span> <span class="nf">putNewMessage</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">:</span><span class="n">sarracenia</span><span class="o">.</span><span class="n">Message</span><span class="p">,</span> <span class="n">content_type</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span><span class="s1">&#39;application/json&#39;</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">putNewMessage</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">:</span><span class="n">sarracenia</span><span class="o">.</span><span class="n">Message</span><span class="p">,</span> <span class="n">content_type</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span><span class="s1">&#39;application/json&#39;</span><span class="p">,</span> <span class="n">exchange</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span> <span class="kc">None</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">bool</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> publish a message as set up to the given topic.</span>

Expand Down
7 changes: 5 additions & 2 deletions _modules/sarracenia/moth/amqp.html
Original file line number Diff line number Diff line change
Expand Up @@ -783,9 +783,12 @@ <h1>Source code for sarracenia.moth.amqp</h1><div class="highlight"><pre>
<span class="k">if</span> <span class="p">(</span> <span class="s1">&#39;exchangeSplit&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">o</span><span class="p">)</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">o</span><span class="p">[</span><span class="s1">&#39;exchangeSplit&#39;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="c1"># FIXME: assert ( len(self.o[&#39;exchange&#39;]) == self.o[&#39;post_exchangeSplit&#39;] )</span>
<span class="c1"># if that isn&#39;t true... then there is something wrong... should we check ?</span>
<span class="n">idx</span> <span class="o">=</span> <span class="nb">sum</span><span class="p">(</span>
<span class="nb">bytearray</span><span class="p">(</span><span class="n">body</span><span class="p">[</span><span class="s1">&#39;identity&#39;</span><span class="p">][</span><span class="s1">&#39;value&#39;</span><span class="p">],</span>
<span class="k">if</span> <span class="s1">&#39;exchangeSplitOverride&#39;</span> <span class="ow">in</span> <span class="n">message</span><span class="p">:</span>
<span class="n">idx</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">message</span><span class="p">[</span><span class="s1">&#39;exchangeSplitOverride&#39;</span><span class="p">])</span><span class="o">%</span><span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">o</span><span class="p">[</span><span class="s1">&#39;exchange&#39;</span><span class="p">])</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">idx</span> <span class="o">=</span> <span class="nb">sum</span><span class="p">(</span> <span class="nb">bytearray</span><span class="p">(</span><span class="n">body</span><span class="p">[</span><span class="s1">&#39;identity&#39;</span><span class="p">][</span><span class="s1">&#39;value&#39;</span><span class="p">],</span>
<span class="s1">&#39;ascii&#39;</span><span class="p">))</span> <span class="o">%</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">o</span><span class="p">[</span><span class="s1">&#39;exchange&#39;</span><span class="p">])</span>

<span class="n">exchange</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">o</span><span class="p">[</span><span class="s1">&#39;exchange&#39;</span><span class="p">][</span><span class="n">idx</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
Expand Down
Loading

0 comments on commit 91685b4

Please sign in to comment.