-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfetch_data.py
More file actions
133 lines (114 loc) · 3.77 KB
/
Copy pathfetch_data.py
File metadata and controls
133 lines (114 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import pymongo
import mysql.connector
import psycopg2
import redis
from elasticsearch import Elasticsearch
import os
import cx_Oracle
# MongoDB
def fetch_mongo_data():
client = pymongo.MongoClient("mongodb://root:example@192.168.1.11:27017/")
db = client["my_database"]
collection = db["my_table"]
data = collection.find({})
print("\nMongoDB Data:")
for doc in data:
print(doc)
# MySQL
def fetch_mysql_data():
conn = mysql.connector.connect(
host="192.168.1.12",
user="root",
password="example",
database="my_database"
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table;")
rows = cursor.fetchall()
print("\nMySQL Data:")
for row in rows:
print(row)
# PostgresSQL
def fetch_postgres_data():
conn = psycopg2.connect(
host="192.168.1.13",
user = "postgres",
password="example",
dbname="my_database"
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table;")
rows = cursor.fetchall()
print("\nPostgreSQL Data")
for row in rows:
print(row)
# Redis
def fetch_redis_data():
try:
# Connect to the Redis server
r = redis.Redis(host='192.168.1.14', port=6379, decode_responses=True)
# Use SCAN to find all keys matching the pattern "*user*"
cursor = 0
pattern = '*user*'
print("\nRedis Data:")
# Loop through all matching keys and fetch their hash data
while True:
cursor, keys = r.scan(cursor=cursor, match=pattern)
for user_key in keys:
# Check if the key is a hash (optional)
if r.type(user_key) == 'hash':
user_data = r.hgetall(user_key)
if user_data:
print(f"Data for {user_key}:")
for field, value in user_data.items():
print(f" {field}: {value}")
else:
print(f"No data found for {user_key}.")
# If the cursor is 0, the scan is complete
if cursor == 0:
break
except redis.ConnectionError:
print("Could not connect to the Redis server.")
# Elasticsearch
def fetch_es_data():
es = Elasticsearch([{'host': '192.168.1.15','port': 9200, 'scheme': 'http'}])
res = es.search(index="my_index", body={"query": {"match_all": {}}})
print("\nElasticsearch Data:")
for hit in res['hits']['hits']:
print(hit["_source"])
# OracleDB
def fetch_oracledb_data():
# Set Oracle Client library path
os.environ['ORACLE_HOME'] = '/opt/oracle/instantclient_19_19'
os.environ['LD_LIBRARY_PATH'] = '/opt/oracle/instantclient_19_19'
# Database connection details
host = "192.168.1.16"
port = 1521
#service_name = "MYPDB"
service_name = "MYDB"
user = "my_database"
password = "my_password"
# Connect to the Oracle database
dsn = cx_Oracle.makedsn(host, port, service_name=service_name)
connection = cx_Oracle.connect(user=user, password=password, dsn=dsn)
print("\nOracleDB Data:")
try:
cursor = connection.cursor()
# Execute a query to select all data from my_table
cursor.execute("SELECT * FROM my_table")
# Fetch and print all rows from the table
rows = cursor.fetchall()
for row in rows:
print(f"First Name: {row[0]}, Last Name: {row[1]}, Email: {row[2]}, Phone Number: {row[3]}")
finally:
# Close the cursor and connection
cursor.close()
connection.close()
if __name__ == "__main__":
fetch_mongo_data()
fetch_mysql_data()
fetch_postgres_data()
fetch_redis_data()
fetch_es_data()
fetch_oracledb_data()
print("\n")