1515# ===============================================================================
1616
1717import time
18+ import uuid
19+ from datetime import datetime
1820
1921import numpy as np
2022import pandas as pd
2123import pyproj
24+ from pydantic import ValidationError
2225from shapely import Point
2326from shapely .ops import transform
27+ from sqlalchemy import select
2428
25- from db import Location , LocationThingAssociation
29+ from db import Location , LocationThingAssociation , adder , WellScreen , Thing , Observation , Sample
2630from db .engine import session_ctx
31+ from schemas .thing import CreateWellScreen
2732
2833# from db.observation.groundwaterlevel import GroundwaterLevelObservation
2934
@@ -67,7 +72,42 @@ def make_location(row):
6772 )
6873
6974
70- #
75+ def transfer_water_levels (session ):
76+ wd = pd .read_csv ("./data/water_levels.csv" )
77+ gwd = wd .groupby (["PointID" ])
78+
79+ for index , group in gwd :
80+ for row in group .itertuples ():
81+ if pd .isna (row .DepthToWater ) or pd .isna (row .DateMeasured ):
82+ print (f"Skipping row { row .Index } due to missing data." )
83+ continue
84+
85+ dt = datetime .fromisoformat (row .DateMeasured )
86+ thing = session .query (Thing ).where (Thing .name == row .PointID ).first ()
87+ if thing is None :
88+ print (f"Thing with PointID { row .PointID } not found. Skipping water level." )
89+ continue
90+
91+ sample = Sample ()
92+ sample .sampler_name = 'unknown'
93+ sample .sample_type = 'groundwater level'
94+
95+ sample .field_sample_id = str (uuid .uuid4 ())
96+ sample .sample_date = dt
97+ sample .thing = thing
98+ session .add (sample )
99+
100+ obs = Observation ()
101+ obs .sensor_id = 1
102+ obs .sample = sample
103+ obs .observation_datetime = dt
104+ obs .depth_to_water = row .DepthToWater
105+ obs .observed_property = "groundwater level"
106+ obs .unit = "ft"
107+
108+ session .add (obs )
109+ session .commit ()
110+
71111# def migrate_water_levels(session, limit=800):
72112# wd = pd.read_csv("./migration/data/water_levels.csv")
73113# p = pd.read_csv("./migration/data/welldata.csv")
@@ -150,14 +190,14 @@ def make_location(row):
150190ADDED = []
151191
152192
153- def transfer_springs (session , limit = 10000 ):
193+ def transfer_springs (session , limit = None ):
154194 ldf = pd .read_csv ("./data/location.csv" )
155195 ldf = ldf [ldf ["SiteType" ] == "SP" ]
156196 ldf = ldf [ldf ["Easting" ].notna () & ldf ["Northing" ].notna ()]
157197 n = len (ldf )
158198 start_time = time .time ()
159199 for i , row in enumerate (ldf .itertuples ()):
160- if i >= limit :
200+ if limit and i >= limit :
161201 print (f"Reached limit of { limit } rows. Stopping migration." )
162202 break
163203
@@ -185,7 +225,7 @@ def transfer_springs(session, limit=10000):
185225 session .add (assoc )
186226
187227
188- def transfer_wells (session , limit = 1000 ):
228+ def transfer_wells (session , limit = None ):
189229 wdf = pd .read_csv ("./data/welldata.csv" )
190230 ldf = pd .read_csv ("./data/location.csv" )
191231
@@ -200,7 +240,7 @@ def transfer_wells(session, limit=1000):
200240 start_time = time .time ()
201241
202242 for i , row in enumerate (wdf .itertuples ()):
203- if i >= limit :
243+ if limit and i >= limit :
204244 print ("Reached limit of" , limit , "rows. Stopping migration." )
205245 break
206246
@@ -242,7 +282,51 @@ def transfer_wells(session, limit=1000):
242282 session .add (assoc )
243283 # break
244284
285+ def transfer_wellscreens (session , limit = None ):
286+ wdf = pd .read_csv ("./data/wellscreens.csv" )
287+ wdf = wdf .replace (pd .NA , None )
288+ wdf = wdf .replace ({np .nan : None })
245289
290+ n = len (wdf )
291+ start_time = time .time ()
292+
293+ for i , row in enumerate (wdf .itertuples ()):
294+ if limit and i >= limit :
295+ print ("Reached limit of" , limit , "rows. Stopping migration." )
296+ break
297+
298+ if i and not i % 100 :
299+ print (
300+ f"Processing row { i } of { n } . { row .PointID } , avg rows per second: { i / (time .time () - start_time ):.2f} "
301+ )
302+ session .commit ()
303+ # thing_id: int
304+ # screen_depth_bottom: float
305+ # screen_depth_top: float
306+ # screen_type: str | None = None
307+ # print(row)
308+
309+ sql = select (Thing ).where (Thing .name == row .PointID )
310+ thing = session .execute (sql ).scalar_one_or_none ()
311+ if not thing :
312+ print (f"Thing with PointID { row .PointID } not found. Skipping well screen." )
313+ continue
314+
315+ well_screen_data = {
316+ "thing_id" : thing .id ,
317+ "screen_depth_top" : row .ScreenTop ,
318+ "screen_depth_bottom" : row .ScreenBottom ,
319+ # "screen_type": row.ScreenType,
320+ "screen_description" : row .ScreenDescription ,
321+ "release_status" : 'draft'
322+ }
323+ try :
324+ model = CreateWellScreen .model_validate (well_screen_data )
325+ adder (session , WellScreen , model )
326+ except ValidationError as e :
327+ print (f"Validation error for row { i } with PointID { row .PointID } : { e } " )
328+ continue
329+ # session.add(screen)
246330# def reset_db():
247331# configure_mappers()
248332#
@@ -256,8 +340,9 @@ def transfer_wells(session, limit=1000):
256340if __name__ == "__main__" :
257341 # reset_db()
258342 with session_ctx () as sess :
259- transfer_wells (sess , limit = 10000 )
260- transfer_springs (sess , limit = 10000 )
261- # migrate_water_levels(sess)
343+ # transfer_wells(sess, 1000)
344+ # transfer_springs(sess, limit=10000)
345+ # transfer_wellscreens(sess)
346+ transfer_water_levels (sess )
262347
263348# ============= EOF =============================================
0 commit comments