Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 112 additions & 126 deletions tests/features/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,188 +46,170 @@ def closure(context, *args, **kwargs):


@add_context_object_container("locations")
def add_location(context, session, lid):
loc = session.get(Location, lid)

if not loc:
loc = Location(
# name="first location",
notes="these are some test notes",
point="POINT(-107.949533 33.809665)",
elevation=2464.9,
release_status="draft",
elevation_accuracy=100,
elevation_method="Survey-grade GPS",
coordinate_accuracy=50,
coordinate_method="GPS, uncorrected",
)
session.add(loc)
session.commit()
session.refresh(loc)
def add_location(context, session):
loc = Location(
# name="first location",
notes="these are some test notes",
point="POINT(-107.949533 33.809665)",
elevation=2464.9,
release_status="draft",
elevation_accuracy=100,
elevation_method="Survey-grade GPS",
coordinate_accuracy=50,
coordinate_method="GPS, uncorrected",
)
session.add(loc)
session.commit()
session.refresh(loc)

context.objects["locations"].append(loc)
return loc


@add_context_object_container("wells")
def add_well(context, session, location, thing_id):
well = session.get(Thing, thing_id)
if well is None:
well = Thing(
name=f"WL-{thing_id:04d}",
first_visit_date="2023-03-03",
thing_type="water well",
release_status="draft",
well_depth=10,
hole_depth=10,
well_construction_notes="Test well construction notes",
well_casing_diameter=5.0,
well_casing_depth=10.0,
)
session.add(well)
session.commit()
def add_well(context, session, location, name_num):
well = Thing(
name=f"WL-{name_num:04d}",
first_visit_date="2023-03-03",
thing_type="water well",
release_status="draft",
well_depth=10,
hole_depth=10,
well_construction_notes="Test well construction notes",
well_casing_diameter=5.0,
well_casing_depth=10.0,
)
session.add(well)
session.commit()

assoc = LocationThingAssociation(location=location, thing=well)
assoc.effective_start = "2025-02-01T00:00:00Z"
session.add(assoc)
session.commit()
assoc = LocationThingAssociation(location=location, thing=well)
assoc.effective_start = "2025-02-01T00:00:00Z"
session.add(assoc)
session.commit()

session.refresh(well)
session.refresh(well)

context.objects["wells"].append(well)
return well


@add_context_object_container("springs")
def add_spring(context, session, location, thing_id):
spring = session.get(Thing, thing_id)
if spring is None:
spring = Thing(
name=f"SP-{thing_id:04d}",
first_visit_date="2023-03-03",
thing_type="spring",
release_status="draft",
# well_depth=10,
# hole_depth=10,
# well_construction_notes="Test well construction notes",
# well_casing_diameter=5.0,
# well_casing_depth=10.0,
)
session.add(spring)
session.commit()
def add_spring(context, session, location, name_num):
spring = Thing(
name=f"SP-{name_num:04d}",
first_visit_date="2023-03-03",
thing_type="spring",
release_status="draft",
# well_depth=10,
# hole_depth=10,
# well_construction_notes="Test well construction notes",
# well_casing_diameter=5.0,
# well_casing_depth=10.0,
)
session.add(spring)
session.commit()

assoc = LocationThingAssociation(location=location, thing=spring)
assoc.effective_start = "2025-02-01T00:00:00Z"
session.add(assoc)
session.commit()
assoc = LocationThingAssociation(location=location, thing=spring)
assoc.effective_start = "2025-02-01T00:00:00Z"
session.add(assoc)
session.commit()

session.refresh(spring)
context.objects["springs"].append(spring)
return spring


@add_context_object_container("sensors")
def add_sensor(context, session, sid):
sensor = session.get(Sensor, sid)
if sensor is None:
sensor = Sensor(
name="Test Sensor",
sensor_type="Pressure Transducer",
model="Model X",
serial_no="123456",
pcn_number="PCN123456",
owner_agency="NMBGMR",
sensor_status="In Service",
notes="Test equipment",
release_status="draft",
)
session.add(sensor)
session.commit()
session.refresh(sensor)
sensor = Sensor(
name="Test Sensor",
sensor_type="Pressure Transducer",
model="Model X",
serial_no="123456",
pcn_number="PCN123456",
owner_agency="NMBGMR",
sensor_status="In Service",
notes="Test equipment",
release_status="draft",
)
session.add(sensor)
session.commit()
session.refresh(sensor)

context.objects["sensors"].append(sensor)
return sensor


@add_context_object_container("groups")
def add_group(context, session, wells, gid):
group = session.get(Group, gid)
if not group:
group = Group(name="Collabnet")
for w in wells:
assoc = GroupThingAssociation(group=group, thing=w)
session.add(assoc)

session.add(group)
session.commit()
session.refresh(group)
group = Group(name="Collabnet")
for w in wells:
assoc = GroupThingAssociation(group=group, thing=w)
session.add(assoc)

session.add(group)
session.commit()
session.refresh(group)

context.objects["groups"].append(group)
return group


@add_context_object_container("deployments")
def add_deployment(context, session, sid):
deployment = session.get(Deployment, sid)
if deployment is None:
deployment = Deployment(
thing_id=1,
sensor_id=1,
installation_date=datetime.now(),
)
session.add(deployment)
session.commit()
session.refresh(deployment)
def add_deployment(context, session, tid, sid):
deployment = Deployment(
thing_id=tid,
sensor_id=sid,
installation_date=datetime.now(),
)
session.add(deployment)
session.commit()
session.refresh(deployment)

context.objects["deployments"].append(deployment)
return deployment


@add_context_object_container("blocks")
def add_block(context, session, parameter):
block = (
session.query(TransducerObservationBlock)
.filter(TransducerObservationBlock.parameter_id == parameter.id)
.one_or_none()
block = TransducerObservationBlock(
parameter_id=parameter.id,
start_datetime=datetime.now() - timedelta(hours=1),
end_datetime=datetime.now() + timedelta(hours=1),
review_status="not reviewed",
)
add_obs = False
if block is None:
block = TransducerObservationBlock(
parameter_id=parameter.id,
start_datetime=datetime.now() - timedelta(hours=1),
end_datetime=datetime.now() + timedelta(hours=1),
review_status="not reviewed",
)

session.add(block)
session.commit()
session.refresh(block)
add_obs = True

session.add(block)
session.commit()
session.refresh(block)

context.objects["blocks"].append(block)
return add_obs
return block


def before_all(context):
context.objects = {}

force = False
with session_ctx() as session:
if force or session.query(LexiconTerm).count() == 0:
if session.query(LexiconTerm).count() == 0 or force:
erase_and_rebuild_db(session)
init_lexicon()
init_parameter()

loc = add_location(context, session, 1)
loc2 = add_location(context, session, 2)
loc3 = add_location(context, session, 3)
loc4 = add_location(context, session, 4)
loc_1 = add_location(context, session)
loc_2 = add_location(context, session)
loc_3 = add_location(context, session)
loc_4 = add_location(context, session)

add_well(context, session, loc, 1)
add_well(context, session, loc2, 2)
add_well(context, session, loc3, 3)
add_spring(context, session, loc4, 4)
add_sensor(context, session, 1)
deployment = add_deployment(context, session, 1)
well_1 = add_well(context, session, loc_1, name_num=1)
well_2 = add_well(context, session, loc_2, name_num=2)
well_3 = add_well(context, session, loc_3, name_num=3)
spring_4 = add_spring(context, session, loc_4, name_num=4)
sensor_1 = add_sensor(context, session, well_1.id)
deployment = add_deployment(context, session, well_1.id, sensor_1.id)

# parameter ID can be hardcoded because init_parameter always creates the same one
parameter = session.get(Parameter, 1)
add_obs = add_block(context, session, parameter)
if add_obs:
Expand All @@ -243,7 +225,11 @@ def before_all(context):


def after_all(context):
pass
with session_ctx() as session:
for table in context.objects.values():
for obj in table:
session.delete(obj)
session.commit()


# ============= EOF =============================================
Loading