diff --git a/CHANGELOG.md b/CHANGELOG.md index d84d7ad8..ef6ed975 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +# 2.0.5 + * Merging upstream forked repo changes. + +# 2.0.4 + * Recursively call the function if `Retry-After` has the value greater than 0 [#192](https://github.com/singer-io/tap-github/pull/192) + +# 2.0.3 + * Handles the secondary rate limit - `Retry-After` [#191](https://github.com/singer-io/tap-github/pull/191) + +# 2.0.2 + * Make the tap sleep for `X-RateLimit-Reset` + `2` seconds, whenever the API rate limit is hit [#190](https://github.com/singer-io/tap-github/pull/190) + +# 2.0.1 + * Allow `commits` stream sync to continue when we hit an empty repo [#187](https://github.com/singer-io/tap-github/pull/187) + # 2.0.0 * Schema updates [#170](https://github.com/singer-io/tap-github/pull/170) [#169](https://github.com/singer-io/tap-github/pull/169) * Update data types of fields in `events` and `issue_events` stream diff --git a/setup.py b/setup.py index b6c06fef..79890701 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-github', - version='2.0.0', + version='2.0.5', description='Singer.io tap for extracting data from the GitHub API', author='Stitch', url='http://singer.io', diff --git a/tap_github/client.py b/tap_github/client.py index b38286a1..eb7e4608 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -111,6 +111,10 @@ def raise_for_error(resp, source, stream, client, should_skip_404): except JSONDecodeError: response_json = {} + if stream == "commits" and response_json.get("message") == "Git Repository is empty.": + LOGGER.info("Encountered an empty git repository") + return None + if error_code == 404 and should_skip_404: # Add not accessible stream into list. client.not_accessible_repos.add(stream) @@ -150,6 +154,14 @@ def rate_throttling(response, max_sleep_seconds, min_remain_rate_limit): """ For rate limit errors, get the remaining time before retrying and calculate the time to sleep before making a new request. """ + if "Retry-After" in response.headers: + # handles the secondary rate limit + seconds_to_sleep = int(response.headers['Retry-After']) + if seconds_to_sleep > 0: + LOGGER.info("API rate limit exceeded. Tap will retry the data collection after %s seconds.", seconds_to_sleep) + time.sleep(seconds_to_sleep) + #returns True if tap sleeps + return True if 'X-RateLimit-Remaining' in response.headers: if int(response.headers['X-RateLimit-Remaining']) <= min_remain_rate_limit: seconds_to_sleep = calculate_seconds(int(response.headers['X-RateLimit-Reset'])) @@ -160,10 +172,13 @@ def rate_throttling(response, max_sleep_seconds, min_remain_rate_limit): LOGGER.info("API rate limit exceeded. Tap will retry the data collection after %s seconds.", seconds_to_sleep) time.sleep(seconds_to_sleep) - else: - # Raise an exception if `X-RateLimit-Remaining` is not found in the header. - # API does include this key header if provided base URL is not a valid github custom domain. - raise GithubException("The API call using the specified base url was unsuccessful. Please double-check the provided base URL.") + #returns True if tap sleeps + return True + return False + + # Raise an exception if `X-RateLimit-Remaining` is not found in the header. + # API does include this key header if provided base URL is not a valid github custom domain. + raise GithubException("The API call using the specified base url was unsuccessful. Please double-check the provided base URL.") class GithubClient: """ @@ -211,13 +226,19 @@ def authed_get(self, source, url, headers={}, stream="", should_skip_404 = True) with metrics.http_request_timer(url) as timer: self.session.headers.update(headers) resp = self.session.request(method='get', url=url, timeout=self.get_request_timeout()) + if rate_throttling(resp, self.max_sleep_seconds, self.min_remain_rate_limit): + # If the API rate limit is reached, the function will be recursively + self.authed_get(source, url, headers, stream, should_skip_404) if resp.status_code != 200: raise_for_error(resp, source, stream, self, should_skip_404) timer.tags[metrics.Tag.http_status_code] = resp.status_code - rate_throttling(resp, self.max_sleep_seconds, self.min_remain_rate_limit) - if resp.status_code == 404 or resp.status_code == 422: + if resp.status_code in {404, 409, 422}: # Return an empty response body since we're not raising a NotFoundException - resp._content = b'{}' # pylint: disable=protected-access + + # In the 409 case, this is only for `commits` returning an + # error for an empty repository, so we'll treat this as an + # empty list of records to process + resp._content = b'{}' # pylint: disable=protected-access return resp def authed_get_all_pages(self, source, url, headers={}, stream="", should_skip_404 = True): @@ -245,7 +266,7 @@ def verify_repo_access(self, url_for_repo, repo): Call rest API to verify that the user has sufficient permissions to access this repository. """ try: - self.authed_get("verifying repository access", url_for_repo) + self.authed_get("verifying repository access", url_for_repo, stream="commits") except NotFoundException: # Throwing user-friendly error message as it checks token access message = "HTTP-error-code: 404, Error: Please check the repository name \'{}\' or you do not have sufficient permissions to access this repository.".format(repo) diff --git a/tests/test_github_bookmarks.py b/tests/test_github_bookmarks.py index 9e2c4135..d40372d2 100644 --- a/tests/test_github_bookmarks.py +++ b/tests/test_github_bookmarks.py @@ -14,15 +14,18 @@ class TestGithubBookmarks(TestGithubBase): def name(): return "tap_tester_github_bookmarks" - def calculated_states_by_stream(self, current_state, synced_records, replication_keys): + def calculated_states_by_stream(self, current_state, synced_records, replication_keys, start_date): """ - Look at the bookmarks from a previous sync and set a new bookmark - value based off timedelta expectations. This ensures the subsequent sync will replicate - at least 1 record but, fewer records than the previous sync. + Look at the bookmarks from a previous sync and shift it to a + date to ensure the subsequent sync will replicate at least 1 + record but, fewer records than the previous sync. """ - timedelta_by_stream = {stream: [90,0,0] # {stream_name: [days, hours, minutes], ...} + # {stream_name: [days, hours, minutes], ...} + timedelta_by_stream = {stream: [90,0,0] for stream in self.expected_streams()} + timedelta_by_stream["commits"] = [7, 0, 0] + repo = self.get_properties().get('repository') stream_to_calculated_state = {repo: {stream: "" for stream in current_state['bookmarks'][repo].keys()}} @@ -31,7 +34,9 @@ def calculated_states_by_stream(self, current_state, synced_records, replication state_as_datetime = dateutil.parser.parse(state_value) days, hours, minutes = timedelta_by_stream[stream] - calculated_state_as_datetime = state_as_datetime - datetime.timedelta(days=days, hours=hours, minutes=minutes) + + start_date_as_datetime = dateutil.parser.parse(start_date) + calculated_state_as_datetime = start_date_as_datetime + datetime.timedelta(days=days, hours=hours, minutes=minutes) state_format = '%Y-%m-%dT%H:%M:%SZ' calculated_state_formatted = datetime.datetime.strftime(calculated_state_as_datetime, state_format) @@ -49,7 +54,7 @@ def test_run(self): All data of the second sync is >= the bookmark from the first sync The number of records in the 2nd sync is less then the first • Verify that for full table stream, all data replicated in sync 1 is replicated again in sync 2. - + PREREQUISITE For EACH stream that is incrementally replicated there are multiple rows of data with different values for the replication key @@ -83,9 +88,12 @@ def test_run(self): ### Update State Between Syncs ########################################################################## + first_sync_start_date = self.get_properties()['start_date'] new_states = {'bookmarks': dict()} simulated_states = self.calculated_states_by_stream(first_sync_bookmarks, - first_sync_records, expected_replication_keys) + first_sync_records, + expected_replication_keys, + first_sync_start_date) for repo, new_state in simulated_states.items(): new_states['bookmarks'][repo] = new_state menagerie.set_state(conn_id, new_states) @@ -126,7 +134,7 @@ def test_run(self): replication_key = next(iter(expected_replication_keys[stream])) first_bookmark_value = first_bookmark_key_value.get('since') second_bookmark_value = second_bookmark_key_value.get('since') - + first_bookmark_value_ts = self.dt_to_ts(first_bookmark_value, self.BOOKMARK_FORMAT) second_bookmark_value_ts = self.dt_to_ts(second_bookmark_value, self.BOOKMARK_FORMAT) @@ -147,11 +155,11 @@ def test_run(self): # For events stream replication key value is coming in different format if stream == 'events': replication_key_format = self.EVENTS_RECORD_REPLICATION_KEY_FORMAT - + for record in first_sync_messages: # Verify the first sync bookmark value is the max replication key value for a given stream replication_key_value = self.dt_to_ts(record.get(replication_key), replication_key_format) - + self.assertLessEqual( replication_key_value, first_bookmark_value_ts, msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced." @@ -160,10 +168,10 @@ def test_run(self): for record in second_sync_messages: # Verify the second sync bookmark value is the max replication key value for a given stream replication_key_value = self.dt_to_ts(record.get(replication_key), replication_key_format) - + self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark.") - + self.assertLessEqual( replication_key_value, second_bookmark_value_ts, msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced." diff --git a/tests/test_github_pagination.py b/tests/test_github_pagination.py index 06a24abd..f0ec3196 100644 --- a/tests/test_github_pagination.py +++ b/tests/test_github_pagination.py @@ -23,30 +23,45 @@ def get_properties(self, original: bool = True): return return_value def test_run(self): - + streams_to_test = self.expected_streams() # Pagination is not supported for "team_memberships" by Github API. # Skipping "teams" stream as it's RECORD count is <= 30. - untestable_streams = {'team_memberships', 'teams'} + untestable_streams = { + 'team_memberships', + 'teams', + 'team_members', + 'collaborators', + 'assignees', + } - # For some streams RECORD count were not > 30 in same test-repo. + # For some streams RECORD count were not > 30 in same test-repo. # So, separated streams on the basis of RECORD count. self.repository_name = 'singer-io/tap-github' - expected_stream_1 = {'comments', 'stargazers', 'commits', 'pull_requests', 'reviews', 'review_comments', 'pr_commits', 'issues'} + expected_stream_1 = { + 'comments', + 'stargazers', + 'commits', + 'pull_requests', + 'reviews', + 'review_comments', + 'pr_commits', + 'issues', + } self.run_test(expected_stream_1) - + self.repository_name = 'singer-io/test-repo' expected_stream_2 = streams_to_test - expected_stream_1 - untestable_streams self.run_test(expected_stream_2) - + def run_test(self, streams): """ - • Verify that for each stream you can get multiple pages of data. + • Verify that for each stream you can get multiple pages of data. This requires we ensure more than 1 page of data exists at all times for any given stream. • Verify by pks that the data replicated matches the data we expect. """ - + # Page size for pagination supported streams page_size = 30 conn_id = connections.ensure_connection(self) @@ -83,7 +98,7 @@ def run_test(self, streams): # Verify that for each stream you can get multiple pages of data self.assertGreater(record_count_sync, page_size, msg="The number of records is not over the stream max limit") - + # Chunk the replicated records (just primary keys) into expected pages pages = [] page_count = ceil(len(primary_keys_list) / page_size) @@ -102,4 +117,4 @@ def run_test(self, streams): self.assertTrue( current_page.isdisjoint(other_page), msg=f'other_page_primary_keys={other_page}' - ) \ No newline at end of file + ) diff --git a/tests/test_github_start_date.py b/tests/test_github_start_date.py index 5ea10ced..a37fa43c 100644 --- a/tests/test_github_start_date.py +++ b/tests/test_github_start_date.py @@ -42,13 +42,27 @@ def test_run(self): self.run_test(date_1, date_2, expected_stream_2) date_2 = '2022-05-06T00:00:00Z' - expected_stream_3 = {'pull_requests', 'pr_commits', 'review_comments', 'reviews'} + expected_stream_3 = {'pr_commits', 'review_comments', 'reviews'} self.run_test(date_1, date_2, expected_stream_3) date_2 = '2022-01-27T00:00:00Z' + expected_stream_4 = self.expected_streams().difference( + expected_stream_1, + expected_stream_2, + expected_stream_3, + {'events', 'issues', 'pull_requests'} + ) + # run the test for all the streams excluding 'events' stream - # as for 'events' stream we have to use dynamic dates - self.run_test(date_1, date_2, self.expected_streams() - expected_stream_1 - expected_stream_2 - expected_stream_3 - {'events'}) + # as for 'events' stream we have to use dynamic dates. + # `issues` doesn't have enough data in this range, so we skip it too + self.run_test(date_1, date_2, expected_stream_4) + + date_3 = '2023-01-27T00:00:00Z' + self.run_test(date_1, date_3, {"issues"}) + + date_4 = '2023-01-01T00:00:00Z' + self.run_test(date_1, date_4, {'pull_requests'}) # As per the Documentation: https://docs.github.com/en/rest/reference/activity#events # the 'events' of past 90 days will only be returned @@ -60,7 +74,7 @@ def test_run(self): self.run_test(date_1, date_2, {'events'}) def run_test(self, date_1, date_2, streams): - """ + """ • Verify that a sync with a later start date has at least one record synced and less records than the 1st sync with a previous start date • Verify that each stream has less records than the earlier start date sync @@ -89,7 +103,7 @@ def run_test(self, date_1, date_2, streams): # run check mode found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) - + # table and field selection test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 if catalog.get('stream_name') in expected_streams] @@ -130,7 +144,7 @@ def run_test(self, date_1, date_2, streams): self.assertGreater(sum(record_count_by_stream_1.values()), sum(record_count_by_stream_2.values())) for stream in expected_streams: - with self.subTest(stream=stream): + with self.subTest(stream=stream, start_date_1=date_1, start_date_2=date_2): # expected values expected_primary_keys = self.expected_primary_keys()[stream] @@ -154,7 +168,7 @@ def run_test(self, date_1, date_2, streams): self.assertGreater(record_count_sync_2, 0) if expected_metadata.get(self.OBEYS_START_DATE): - + # Expected bookmark key is one element in set so directly access it bookmark_keys_list_1 = [message.get('data').get(next(iter(expected_bookmark_keys))) for message in synced_records_1.get(stream).get('messages') if message.get('action') == 'upsert'] @@ -195,7 +209,7 @@ def run_test(self, date_1, date_2, streams): self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1)) else: - + # Verify that the 2nd sync with a later start date replicates the same number of # records as the 1st sync. self.assertEqual(record_count_sync_2, record_count_sync_1)