@@ -10,17 +10,12 @@ defmodule TransitData.GlidesReport.Loader do
1010
1111 @ doc """
1212 Loads data into ETS tables, and returns counts of files found locally vs downloaded.
13-
14- `start_dt` and `end_dt` are NaiveDateTimes assumed to be in UTC.
1513 """
16- @ spec load_data (
17- NaiveDateTime . t ( ) ,
18- NaiveDateTime . t ( ) ,
19- String . t ( ) ,
20- pos_integer ,
21- pos_integer | :all
22- ) :: % { local: non_neg_integer , downloaded: non_neg_integer }
23- def load_data ( start_dt , end_dt , env_suffix , sample_rate , sample_count ) do
14+ @ spec load_data ( Date . t ( ) , Date . t ( ) , String . t ( ) , pos_integer , pos_integer | :all ) :: % {
15+ local: non_neg_integer ,
16+ downloaded: non_neg_integer
17+ }
18+ def load_data ( start_date , end_date , env_suffix , sample_rate , sample_count ) do
2419 dir = local_dir ( env_suffix )
2520
2621 IO . puts (
@@ -35,17 +30,24 @@ defmodule TransitData.GlidesReport.Loader do
3530
3631 s3_bucket = "mbta-gtfs-s3#{ env_suffix } "
3732
38- start_dt_utc = DateTime . from_naive! ( start_dt , "Etc/UTC" )
39- end_dt_utc = DateTime . from_naive! ( end_dt , "Etc/UTC" )
33+ start_dt =
34+ DateTime . new! ( start_date , ~T[ 04:00:00] , "America/New_York" )
35+ |> DateTime . shift_zone! ( "Etc/UTC" )
4036
41- total_minutes = DateTime . diff ( end_dt_utc , start_dt_utc , :minute )
42- total_increments = div ( total_minutes , sample_rate )
37+ end_dt =
38+ end_date
39+ # The service day ends on the following calendar date.
40+ |> Date . shift ( day: 1 )
41+ |> DateTime . new! ( ~T[ 03:59:59] , "America/New_York" )
42+ |> DateTime . shift_zone! ( "Etc/UTC" )
43+
44+ total_minutes = DateTime . diff ( end_dt , start_dt , :minute )
4345
4446 # Prefixes used to list S3 objects timestamped within the same minute.
4547 minute_prefixes =
46- Enum . map ( 0 .. total_increments , fn increment ->
47- start_dt_utc
48- |> DateTime . add ( increment * sample_rate , :minute )
48+ Enum . map ( 0 .. total_minutes // sample_rate , fn increment ->
49+ start_dt
50+ |> DateTime . add ( increment , :minute )
4951 |> Calendar . strftime ( "%Y/%m/%d/%Y-%m-%dT%H:%M" )
5052 end )
5153
@@ -68,27 +70,19 @@ defmodule TransitData.GlidesReport.Loader do
6870 end
6971
7072 # Loads data into a table.
71- # Returns the number of files that were found locally,
72- # the number that were newly downloaded,
73- # and a list of minutes for which not enough data could be found.
73+ # Returns the number of files that were found locally and the number
74+ # that were newly downloaded:
75+ # %{local: integer, downloaded: integer}
7476 defp populate_table ( table_name , path_prefixes , s3_bucket , sample_count ) do
7577 IO . puts ( "Loading #{ table_name } ..." )
7678
79+ prefix_count = length ( path_prefixes )
80+
7781 { total , insufficients } =
7882 path_prefixes
79- |> Stream . with_index ( fn prefix , i ->
80- IO . write ( [
81- IO.ANSI . clear_line ( ) ,
82- "\r " ,
83- moons_of_progress ( ) [ rem ( i , 8 ) ] ,
84- " Loading data for " ,
85- prefix_to_local_minute ( prefix )
86- ] )
87-
88- prefix
89- end )
83+ |> Stream . with_index ( & update_progress ( & 1 , & 2 , prefix_count ) )
9084 |> Task . async_stream (
91- & load_minute ( & 1 , s3_bucket , table_name , sample_count ) ,
85+ fn prefix -> load_minute ( prefix , s3_bucket , table_name , sample_count ) end ,
9286 ordered: false ,
9387 timeout: 60_000
9488 )
@@ -101,7 +95,7 @@ defmodule TransitData.GlidesReport.Loader do
10195
10296 insufficients =
10397 if is_integer ( sample_count ) and counts . local + counts . downloaded < sample_count ,
104- do: [ prefix_to_local_minute ( counts . prefix ) | insufficients ] ,
98+ do: [ prefix_to_local_dt ( counts . prefix ) | insufficients ] ,
10599 else: insufficients
106100
107101 { total , insufficients }
@@ -110,20 +104,8 @@ defmodule TransitData.GlidesReport.Loader do
110104 IO . puts ( "#{ IO.ANSI . clear_line ( ) } \r 🌝 Done" )
111105
112106 unless Enum . empty? ( insufficients ) do
113- time_ranges =
114- insufficients
115- |> Enum . sort ( )
116- |> Enum . split_while ( & ( & 1 < "04" ) )
117- |> then ( fn { after_midnight_service_day , service_day } ->
118- service_day ++ after_midnight_service_day
119- end )
120- |> Stream . map ( & Time . from_iso8601! ( & 1 <> ":00" ) )
121- # Chunk the individual times into ranges of consecutive times for better human readability.
122- |> Stream . chunk_while ( nil , & chunk_time_ranges / 2 , & { :cont , hh_mm_range ( & 1 ) , nil } )
123- |> Stream . reject ( & is_nil / 1 )
124- |> Enum . join ( ", " )
125-
126- IO . puts ( "#{ table_name } : Insufficient data available for minute(s): #{ time_ranges } " )
107+ time_ranges = datetimes_to_time_ranges ( insufficients )
108+ IO . puts ( "#{ table_name } : Insufficient data available for minute(s):\n #{ time_ranges } " )
127109 end
128110
129111 IO . puts ( "" )
@@ -139,11 +121,60 @@ defmodule TransitData.GlidesReport.Loader do
139121 """ x
140122 end
141123
124+ defp update_progress ( prefix , i , total ) do
125+ pct =
126+ ( 100 * i / total )
127+ |> trunc ( )
128+ |> Integer . to_string ( )
129+ |> String . pad_leading ( 3 )
130+
131+ IO . write ( [
132+ IO.ANSI . clear_line ( ) ,
133+ "\r " ,
134+ moons_of_progress ( ) [ rem ( i , 8 ) ] ,
135+ " Loading data for " ,
136+ Calendar . strftime ( prefix_to_local_dt ( prefix ) , "%x %H:%M" ) ,
137+ " " ,
138+ pct ,
139+ "%"
140+ ] )
141+
142+ prefix
143+ end
144+
142145 # 🌝
143146 defp moons_of_progress do
144147 % { 0 => "🌕" , 1 => "🌖" , 2 => "🌗" , 3 => "🌘" , 4 => "🌑" , 5 => "🌒" , 6 => "🌓" , 7 => "🌔" }
145148 end
146149
150+ @ doc ~S'''
151+ Returns a human-readable string describing a list of minute-granularity
152+ local-timezone DateTimes as comma-separated time ranges.
153+
154+ iex> [~U[2025-01-01T18:00:00Z], ~U[2025-01-01T18:01:00Z], ~U[2025-01-02T08:03:00Z], ~U[2025-01-02T12:00:00Z]]
155+ ...> |> Enum.map(&DateTime.shift_zone!(&1, "America/New_York"))
156+ ...> |> datetimes_to_time_ranges()
157+ """
158+ • 2025-01-01: 13:00-13:01, 03:03
159+ • 2025-01-02: 07:00\
160+ """
161+ '''
162+ def datetimes_to_time_ranges ( datetimes ) do
163+ datetimes
164+ |> Enum . sort ( DateTime )
165+ |> Stream . chunk_by ( & service_day / 1 )
166+ |> Stream . map ( fn dts ->
167+ time_ranges =
168+ dts
169+ |> Stream . map ( fn % DateTime { time_zone: "America/New_York" } = dt -> DateTime . to_time ( dt ) end )
170+ |> Stream . chunk_while ( nil , & chunk_time_ranges / 2 , & { :cont , hh_mm_range ( & 1 ) , nil } )
171+ |> Enum . join ( ", " )
172+
173+ "• #{ service_day ( hd ( dts ) ) } : #{ time_ranges } "
174+ end )
175+ |> Enum . join ( "\n " )
176+ end
177+
147178 defp chunk_time_ranges ( time , nil ) do
148179 # This is the first time in the list.
149180 { :cont , { time , time } }
@@ -182,11 +213,10 @@ defmodule TransitData.GlidesReport.Loader do
182213
183214 defp hh_mm ( time ) , do: Calendar . strftime ( time , "%H:%M" )
184215
185- defp prefix_to_local_minute ( prefix ) do
216+ defp prefix_to_local_dt ( prefix ) do
186217 prefix
187218 |> prefix_to_dt ( )
188219 |> DateTime . shift_zone! ( "America/New_York" )
189- |> Calendar . strftime ( "%H:%M" )
190220 end
191221
192222 defp prefix_to_dt ( prefix ) do
@@ -199,6 +229,23 @@ defmodule TransitData.GlidesReport.Loader do
199229 dt
200230 end
201231
232+ def service_day ( % { time_zone: "America/New_York" } = dt ) do
233+ # Service day starts at 4am.
234+ # Times before that on a calendar date are part of the previous
235+ # calendar date's service day.
236+ if dt . hour >= 4 do
237+ DateTime . to_date ( dt )
238+ else
239+ dt |> DateTime . to_date ( ) |> Date . add ( - 1 )
240+ end
241+ end
242+
243+ def service_day ( % DateTime { } = dt ) do
244+ dt
245+ |> DateTime . shift_zone! ( "America/New_York" )
246+ |> service_day ( )
247+ end
248+
202249 # Loads data for a specific minute of the service day, either by reading existing local files,
203250 # by downloading and reading new files from S3, or a mix of both.
204251 #
0 commit comments