Skip to content

Commit 9fa5a80

Browse files
author
thomas loubrieu
committed
spark fair scheduling, asynchronous job in apii (jobs/), multiple api end-points (...Spark for legacy, or algorithm/...)
create handler manager to have multiple endpoint for the same algorithm implement a demo asynchronous mode in the restapi remove pydataclasses dependency
1 parent af2b234 commit 9fa5a80

18 files changed

Lines changed: 321 additions & 80 deletions

analysis/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ Python module that exposes NEXUS analytical capabilities via a HTTP webservice.
1414
conda activate nexus-analysis
1515
````
1616
17-
2. Install conda dependencies
17+
2. Install conda dependencies and other dependencies
1818
1919
````
2020
cd analysis
21+
pip install asyncio # for asynchronous job management
2122
conda install pyspark
2223
conda install -c conda-forge --file conda-requirements.txt
2324
#conda install numpy matplotlib mpld3 scipy netCDF4 basemap gdal pyproj=1.9.5.1 libnetcdf=4.3.3.1

analysis/conda-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ gdal==3.0.2
1515
mock==2.0.0
1616
singledispatch==3.4.0.3
1717

18+

analysis/webservice/NexusHandler.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
import logging
1818
import types
19+
from functools import partial
1920

20-
AVAILABLE_HANDLERS = []
21+
AVAILABLE_LEGACY_HANDLERS = []
22+
AVAILABLE_RESTAPI_HANDLERS = []
23+
AVAILABLE_WPS_HANDLERS = []
2124
AVAILABLE_INITIALIZERS = []
2225

2326

@@ -32,17 +35,22 @@ def nexus_initializer(clazz):
3235
return clazz
3336

3437

35-
def nexus_handler(clazz):
38+
def nexus_handler(clazz, handler_list=AVAILABLE_LEGACY_HANDLERS):
3639
log = logging.getLogger(__name__)
3740
try:
3841
clazz.validate()
3942
log.info("Adding algorithm module '%s' with path '%s' (%s)" % (clazz.name, clazz.path, clazz))
40-
AVAILABLE_HANDLERS.append(clazz)
43+
handler_list.append(clazz)
4144
except Exception as ex:
4245
log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % (clazz, ex.message), exc_info=True)
4346
return clazz
4447

4548

49+
nexus_restapi_handler = partial(nexus_handler, handler_list=AVAILABLE_RESTAPI_HANDLERS)
50+
nexus_wps_handler = partial(nexus_handler, handler_list=AVAILABLE_WPS_HANDLERS)
51+
52+
53+
4654
DEFAULT_PARAMETERS_SPEC = {
4755
"ds": {
4856
"name": "Dataset",

analysis/webservice/algorithms/Capabilities.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import json
1818

19-
from webservice.NexusHandler import nexus_handler, AVAILABLE_HANDLERS
19+
from webservice.NexusHandler import nexus_handler, AVAILABLE_LEGACY_HANDLERS
2020
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
2121
from webservice.webmodel import NexusResults
2222

@@ -32,7 +32,7 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler):
3232
def calc(self, computeOptions, **args):
3333
capabilities = []
3434

35-
for capability in AVAILABLE_HANDLERS:
35+
for capability in AVAILABLE_LEGACY_HANDLERS:
3636
capabilityDef = {
3737
"name": capability.name,
3838
"path": capability.path,

analysis/webservice/algorithms/NexusCalcHandler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def validate(cls):
2222
if "params" not in cls.__dict__:
2323
raise Exception("Property 'params' has not been defined")
2424

25-
def __init__(self, tile_service_factory, skipCassandra=False, skipSolr=False):
25+
def __init__(self, tile_service_factory):
2626
# self.algorithm_config = algorithm_config
2727
# self._skipCassandra = skipCassandra
2828
# self._skipSolr = skipSolr

analysis/webservice/algorithms_spark/TimeAvgMapSpark.py

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616
from datetime import datetime
1717
from functools import partial
1818

19+
import uuid
1920
import numpy as np
2021
import shapely.geometry
2122
from pytz import timezone
2223

23-
from webservice.NexusHandler import nexus_handler
24+
from webservice.NexusHandler import nexus_handler, nexus_restapi_handler
2425
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
2526
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
2627

@@ -29,6 +30,7 @@
2930

3031

3132
@nexus_handler
33+
@nexus_restapi_handler
3234
class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
3335
# __singleton_lock = threading.Lock()
3436
# __singleton_instance = None
@@ -67,19 +69,6 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
6769
}
6870
singleton = True
6971

70-
# @classmethod
71-
# def instance(cls, algorithm_config=None, sc=None):
72-
# with cls.__singleton_lock:
73-
# if not cls.__singleton_instance:
74-
# try:
75-
# singleton_instance = cls()
76-
# singleton_instance.set_config(algorithm_config)
77-
# singleton_instance.set_spark_context(sc)
78-
# cls.__singleton_instance = singleton_instance
79-
# except AttributeError:
80-
# pass
81-
# return cls.__singleton_instance
82-
8372
def parse_arguments(self, request):
8473
# Parse input arguments
8574
self.log.debug("Parsing arguments")
@@ -118,7 +107,8 @@ def parse_arguments(self, request):
118107

119108
return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, nparts_requested
120109

121-
def calc(self, compute_options, **args):
110+
def calc(self, compute_options,
111+
**args):
122112
"""
123113
124114
:param compute_options: StatsComputeOptions
@@ -130,6 +120,7 @@ def calc(self, compute_options, **args):
130120
metrics_record = self._create_metrics_record()
131121

132122
ds, bbox, start_time, end_time, nparts_requested = self.parse_arguments(compute_options)
123+
133124
self._setQueryParams(ds,
134125
(float(bbox.bounds[1]),
135126
float(bbox.bounds[3]),
@@ -147,13 +138,13 @@ def calc(self, compute_options, **args):
147138
print('Found {} tiles'.format(len(nexus_tiles)))
148139

149140
daysinrange = self._get_tile_service().find_days_in_range_asc(bbox.bounds[1],
150-
bbox.bounds[3],
151-
bbox.bounds[0],
152-
bbox.bounds[2],
153-
ds,
154-
start_time,
155-
end_time,
156-
metrics_callback=metrics_record.record_metrics)
141+
bbox.bounds[3],
142+
bbox.bounds[0],
143+
bbox.bounds[2],
144+
ds,
145+
start_time,
146+
end_time,
147+
metrics_callback=metrics_record.record_metrics)
157148
ndays = len(daysinrange)
158149
if ndays == 0:
159150
raise NoDataException(reason="No data found for selected timeframe")
@@ -262,6 +253,8 @@ def calc(self, compute_options, **args):
262253
maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
263254
endTime=end_time)
264255

256+
257+
265258
@staticmethod
266259
def _map(tile_service_factory, metrics_callback, tile_in_spark):
267260
tile_bounds = tile_in_spark[0]

analysis/webservice/algorithms_spark/TimeSeriesSpark.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from pytz import timezone
3333
from scipy import stats
3434
from webservice import Filtering as filtering
35-
from webservice.NexusHandler import nexus_handler
35+
from webservice.NexusHandler import nexus_handler, nexus_restapi_handler, nexus_wps_handler
3636
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
3737
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
3838

@@ -43,6 +43,7 @@
4343

4444

4545
@nexus_handler
46+
@nexus_restapi_handler
4647
class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
4748
name = "Time Series Spark"
4849
path = "/timeSeriesSpark"

analysis/webservice/algorithms_spark/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
import DailyDifferenceAverageSpark
2222
import HofMoellerSpark
2323
import MaximaMinimaSpark
24-
import NexusCalcSparkHandler
2524
import TimeAvgMapSpark
2625
import TimeSeriesSpark
2726
import VarianceSpark
28-
27+
import NexusCalcSparkHandler
2928

3029
log = logging.getLogger(__name__)
3130

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?xml version="1.0"?>
2+
<allocations>
3+
<pool name="default">
4+
<schedulingMode>FAIR</schedulingMode>
5+
<weight>1</weight>
6+
<minShare>2</minShare>
7+
</pool>
8+
</allocations>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .job import Job

0 commit comments

Comments
 (0)